unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH 0/2] lei Maildir inotify/kevent support
@ 2021-07-19  8:59 Eric Wong
  2021-07-19  8:59 ` [PATCH 1/2] config: s/_one_val/get_1/ for public use Eric Wong
                   ` (2 more replies)
  0 siblings, 3 replies; 4+ messages in thread
From: Eric Wong @ 2021-07-19  8:59 UTC (permalink / raw)
  To: meta

One of the most important features of lei that drove the
decision towards its daemonic architecture.

My brain still has some wires loose or crossed :<

So I've had a very tough time mapping out the internal design
for this that could deal with rename() storms and reduce the
need to handle IN_Q_OVERFLOW conditions.  Eventually,
introducing a new concept in the form of LeiNoteEvent helped my
crippled brain make the necessary connections and cobble
together something that seems halfway working.

LeiNoteEvent represents a significant conceptual shift in lei
internals.  Having long-lived workers is closer to my original
vision of how lei internals would've been.  That old model had
some major weaknesses, however:

1) error handling/reporting back to the CLI required FD passing
   (and large-scale FD passing got confusing pretty quick)

2) excessive FD passing overflowed default kernel buffers
   and made parallelism difficult for unprivileged users

Since there's no CLI to report errors to with internal
notifications, we don't have FDs to pass.  Of course, that
means we can lose errors right now, but it's probably not
a big deal for (mostly) innocuous keyword changes.

Eric Wong (2):
  config: s/_one_val/get_1/ for public use
  lei: start implementing inotify Maildir support

 MANIFEST                         |   6 ++
 lib/PublicInbox/Config.pm        |   6 +-
 lib/PublicInbox/DirIdle.pm       |  17 +++++
 lib/PublicInbox/LEI.pm           | 116 ++++++++++++++++++++++++++---
 lib/PublicInbox/LeiAddWatch.pm   |  41 ++++++++++
 lib/PublicInbox/LeiInput.pm      |   1 +
 lib/PublicInbox/LeiLsWatch.pm    |  15 ++++
 lib/PublicInbox/LeiNoteEvent.pm  | 124 +++++++++++++++++++++++++++++++
 lib/PublicInbox/LeiSearch.pm     |   2 +-
 lib/PublicInbox/LeiSelfSocket.pm |  45 +++++++++++
 lib/PublicInbox/LeiWatch.pm      |  13 ++++
 t/lei-watch.t                    |  49 ++++++++++++
 12 files changed, 419 insertions(+), 16 deletions(-)
 create mode 100644 lib/PublicInbox/LeiAddWatch.pm
 create mode 100644 lib/PublicInbox/LeiLsWatch.pm
 create mode 100644 lib/PublicInbox/LeiNoteEvent.pm
 create mode 100644 lib/PublicInbox/LeiSelfSocket.pm
 create mode 100644 lib/PublicInbox/LeiWatch.pm
 create mode 100644 t/lei-watch.t

^ permalink raw reply	[flat|nested] 4+ messages in thread

* [PATCH 1/2] config: s/_one_val/get_1/ for public use
  2021-07-19  8:59 [PATCH 0/2] lei Maildir inotify/kevent support Eric Wong
@ 2021-07-19  8:59 ` Eric Wong
  2021-07-19  8:59 ` [PATCH 2/2] lei: start implementing inotify Maildir support Eric Wong
  2021-07-21 14:07 ` [PATCH 3/2] lei: auto-refresh watches in config, cancel missing Eric Wong
  2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2021-07-19  8:59 UTC (permalink / raw)
  To: meta

We'll be using this in lei for watch configs.
---
 lib/PublicInbox/Config.pm | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/Config.pm b/lib/PublicInbox/Config.pm
index 36f2fafb..8e46328d 100644
--- a/lib/PublicInbox/Config.pm
+++ b/lib/PublicInbox/Config.pm
@@ -384,7 +384,7 @@ sub rel2abs_collapsed {
 	Cwd::abs_path($p);
 }
 
-sub _one_val {
+sub get_1 {
 	my ($self, $pfx, $k) = @_;
 	my $v = $self->{"$pfx.$k"} // return;
 	return $v if !ref($v);
@@ -430,7 +430,7 @@ sub _fill_ibx {
 	}
 	for my $k (qw(filter inboxdir newsgroup replyto httpbackendmax feedmax
 			indexlevel indexsequentialshard)) {
-		my $v = _one_val($self, $pfx, $k) // next;
+		my $v = get_1($self, $pfx, $k) // next;
 		$ibx->{$k} = $v;
 	}
 
@@ -522,7 +522,7 @@ sub _fill_ei ($$) {
 	}
 	my $es = PublicInbox::ExtSearch->new($d);
 	for my $k (qw(indexlevel indexsequentialshard)) {
-		my $v = _one_val($self, $pfx, $k) // next;
+		my $v = get_1($self, $pfx, $k) // next;
 		$es->{$k} = $v;
 	}
 	for my $k (qw(altid coderepo hide url infourl)) {

^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH 2/2] lei: start implementing inotify Maildir support
  2021-07-19  8:59 [PATCH 0/2] lei Maildir inotify/kevent support Eric Wong
  2021-07-19  8:59 ` [PATCH 1/2] config: s/_one_val/get_1/ for public use Eric Wong
@ 2021-07-19  8:59 ` Eric Wong
  2021-07-21 14:07 ` [PATCH 3/2] lei: auto-refresh watches in config, cancel missing Eric Wong
  2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2021-07-19  8:59 UTC (permalink / raw)
  To: meta

This allows lei to automatically note keyword (message flag)
changes made to a Maildir and propagate it into lei/store:

	lei add-watch --state=tag-ro /path/to/Maildir

This doesn't persist across restarts, yet.  In the future,
it will be applied automatically to "lei q" output Maildirs
by default (with an option to disable it).

State values of tag-rw, index-<ro|rw>, import-<ro|rw> will all
be supported for Maildir.

This represents a fairly major internal change that's fairly
intrusive, but the whole daemon-oriented design was to
facilitate being able to automatically monitor (and propagate)
Maildir/IMAP flag changes.
---
 MANIFEST                         |   6 ++
 lib/PublicInbox/DirIdle.pm       |  17 +++++
 lib/PublicInbox/LEI.pm           | 116 ++++++++++++++++++++++++++---
 lib/PublicInbox/LeiAddWatch.pm   |  41 ++++++++++
 lib/PublicInbox/LeiInput.pm      |   1 +
 lib/PublicInbox/LeiLsWatch.pm    |  15 ++++
 lib/PublicInbox/LeiNoteEvent.pm  | 124 +++++++++++++++++++++++++++++++
 lib/PublicInbox/LeiSearch.pm     |   2 +-
 lib/PublicInbox/LeiSelfSocket.pm |  45 +++++++++++
 lib/PublicInbox/LeiWatch.pm      |  13 ++++
 t/lei-watch.t                    |  49 ++++++++++++
 11 files changed, 416 insertions(+), 13 deletions(-)
 create mode 100644 lib/PublicInbox/LeiAddWatch.pm
 create mode 100644 lib/PublicInbox/LeiLsWatch.pm
 create mode 100644 lib/PublicInbox/LeiNoteEvent.pm
 create mode 100644 lib/PublicInbox/LeiSelfSocket.pm
 create mode 100644 lib/PublicInbox/LeiWatch.pm
 create mode 100644 t/lei-watch.t

diff --git a/MANIFEST b/MANIFEST
index 146a32ab..1d79b7c9 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -198,6 +198,7 @@ lib/PublicInbox/Isearch.pm
 lib/PublicInbox/KQNotify.pm
 lib/PublicInbox/LEI.pm
 lib/PublicInbox/LeiALE.pm
+lib/PublicInbox/LeiAddWatch.pm
 lib/PublicInbox/LeiAuth.pm
 lib/PublicInbox/LeiBlob.pm
 lib/PublicInbox/LeiConvert.pm
@@ -220,8 +221,10 @@ lib/PublicInbox/LeiLsLabel.pm
 lib/PublicInbox/LeiLsMailSource.pm
 lib/PublicInbox/LeiLsMailSync.pm
 lib/PublicInbox/LeiLsSearch.pm
+lib/PublicInbox/LeiLsWatch.pm
 lib/PublicInbox/LeiMailSync.pm
 lib/PublicInbox/LeiMirror.pm
+lib/PublicInbox/LeiNoteEvent.pm
 lib/PublicInbox/LeiOverview.pm
 lib/PublicInbox/LeiP2q.pm
 lib/PublicInbox/LeiPmdir.pm
@@ -232,6 +235,7 @@ lib/PublicInbox/LeiRemote.pm
 lib/PublicInbox/LeiRm.pm
 lib/PublicInbox/LeiSavedSearch.pm
 lib/PublicInbox/LeiSearch.pm
+lib/PublicInbox/LeiSelfSocket.pm
 lib/PublicInbox/LeiStore.pm
 lib/PublicInbox/LeiStoreErr.pm
 lib/PublicInbox/LeiSucks.pm
@@ -239,6 +243,7 @@ lib/PublicInbox/LeiTag.pm
 lib/PublicInbox/LeiToMail.pm
 lib/PublicInbox/LeiUp.pm
 lib/PublicInbox/LeiViewText.pm
+lib/PublicInbox/LeiWatch.pm
 lib/PublicInbox/LeiXSearch.pm
 lib/PublicInbox/Linkify.pm
 lib/PublicInbox/Listener.pm
@@ -434,6 +439,7 @@ t/lei-q-save.t
 t/lei-q-thread.t
 t/lei-sigpipe.t
 t/lei-tag.t
+t/lei-watch.t
 t/lei.t
 t/lei_dedupe.t
 t/lei_external.t
diff --git a/lib/PublicInbox/DirIdle.pm b/lib/PublicInbox/DirIdle.pm
index 5142d005..7031e5fd 100644
--- a/lib/PublicInbox/DirIdle.pm
+++ b/lib/PublicInbox/DirIdle.pm
@@ -53,6 +53,23 @@ sub new {
 	$self;
 }
 
+sub add_watches {
+	my ($self, $dirs, $gone) = @_;
+	my $fl = $MAIL_IN | ($gone ? $MAIL_GONE : 0);
+	for my $d (@$dirs) {
+		$self->{inot}->watch($d, $fl);
+	}
+	PublicInbox::FakeInotify::poll_once($self) if !$ino_cls;
+}
+
+sub rm_watches {
+	my ($self, $dir) = @_;
+	my $inot = $self->{inot};
+	if (my $cb = $inot->can('rm_watches')) { # TODO for fake watchers
+		$cb->($inot, $dir);
+	}
+}
+
 sub event_step {
 	my ($self) = @_;
 	my $cb = $self->{cb};
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index a9f5edae..b92d7512 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -28,14 +28,15 @@ use Time::HiRes qw(stat); # ctime comparisons for config cache
 use File::Path qw(mkpath);
 use File::Spec;
 our $quit = \&CORE::exit;
-our ($current_lei, $errors_log, $listener, $oldset, $dir_idle);
-my ($recv_cmd, $send_cmd);
+our ($current_lei, $errors_log, $listener, $oldset, $dir_idle,
+	$recv_cmd, $send_cmd);
 my $GLP = Getopt::Long::Parser->new;
 $GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
 my $GLP_PASS = Getopt::Long::Parser->new;
 $GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through));
 
 our %PATH2CFG; # persistent for socket daemon
+our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => undef }
 
 # TBD: this is a documentation mechanism to show a subcommand
 # (may) pass options through to another command:
@@ -232,10 +233,9 @@ our %CMD = ( # sorted in order of importance/use:
 	qw(exact! all jobs:i indexed), @c_opt ],
 
 'add-watch' => [ 'LOCATION', 'watch for new messages and flag changes',
-	qw(import! kw! interval=s recursive|r
-	exclude=s include=s), @c_opt ],
+	qw(poll-interval=s state=s recursive|r), @c_opt ],
 'ls-watch' => [ '[FILTER...]', 'list active watches with numbers and status',
-		qw(format|f=s z), @c_opt ],
+		qw(l z|0), @c_opt ],
 'pause-watch' => [ '[WATCH_NUMBER_OR_FILTER]', qw(all local remote), @c_opt ],
 'resume-watch' => [ '[WATCH_NUMBER_OR_FILTER]', qw(all local remote), @c_opt ],
 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch',
@@ -391,6 +391,7 @@ my %OPTDESC = (
 'format|f=s	ls-search' => ['OUT|json|jsonl|concatjson',
 			'listing output format' ],
 'l	ls-search' => 'long listing format',
+'l	ls-watch' => 'long listing format',
 'l	ls-mail-source' => 'long listing format',
 'url	ls-mail-source' => 'show full URL of newsgroup or IMAP folder',
 'format|f=s	ls-external' => $ls_format,
@@ -435,7 +436,7 @@ my %CONFIG_KEYS = (
 	'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m ikw pmd wq1); # internal workers
+my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne); # internal workers
 
 sub _drop_wq {
 	my ($self) = @_;
@@ -538,7 +539,7 @@ sub _lei_atfork_child {
 		chdir '/' or die "chdir(/): $!";
 		close($_) for (grep(defined, delete @$self{qw(0 1 2 sock)}));
 		if (my $cfg = $self->{cfg}) {
-			delete $cfg->{-lei_store};
+			delete @$cfg{qw(-lei_store -watches -lei_note_event)};
 		}
 	} else { # worker, Net::NNTP (Net::Cmd) uses STDERR directly
 		open STDERR, '+>&='.fileno($self->{2}) or warn "open $!";
@@ -555,6 +556,8 @@ sub _lei_atfork_child {
 	undef $listener;
 	undef $dir_idle;
 	%PATH2CFG = ();
+	$MDIR2CFGPATH = {};
+	eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush';
 	undef $errors_log;
 	$quit = \&CORE::exit;
 	$self->{-eml_noisy} or # only "lei import" sets this atm
@@ -781,10 +784,12 @@ sub _lei_cfg ($;$) {
 	my $f = _config_path($self);
 	my @st = stat($f);
 	my $cur_st = @st ? pack('dd', $st[10], $st[7]) : ''; # 10:ctime, 7:size
-	my ($sto, $sto_dir);
+	my ($sto, $sto_dir, $watches, $lne);
 	if (my $cfg = $PATH2CFG{$f}) { # reuse existing object in common case
 		return ($self->{cfg} = $cfg) if $cur_st eq $cfg->{-st};
-		($sto, $sto_dir) = @$cfg{qw(-lei_store leistore.dir)};
+		($sto, $sto_dir, $watches, $lne) =
+				@$cfg{qw(-lei_store leistore.dir -watches
+					-lei_note_event)};
 	}
 	if (!@st) {
 		unless ($creat) {
@@ -805,6 +810,8 @@ sub _lei_cfg ($;$) {
 			eq File::Spec->canonpath($cfg->{'leistore.dir'} //
 						store_path($self))) {
 		$cfg->{-lei_store} = $sto;
+		$cfg->{-lei_note_event} = $lne;
+		$cfg->{-watches} = $watches if $watches;
 	}
 	if (scalar(keys %PATH2CFG) > 5) {
 		# FIXME: use inotify/EVFILT_VNODE to detect unlinked configs
@@ -817,7 +824,7 @@ sub _lei_cfg ($;$) {
 
 sub _lei_store ($;$) {
 	my ($self, $creat) = @_;
-	my $cfg = _lei_cfg($self, $creat);
+	my $cfg = _lei_cfg($self, $creat) // return;
 	$cfg->{-lei_store} //= do {
 		require PublicInbox::LeiStore;
 		my $dir = $cfg->{'leistore.dir'} // store_path($self);
@@ -1126,6 +1133,53 @@ sub dump_and_clear_log {
 	}
 }
 
+sub cfg2lei ($) {
+	my ($cfg) = @_;
+	my $lei = bless { env => { %{$cfg->{-env}} } }, __PACKAGE__;
+	open($lei->{0}, '<&', \*STDIN) or die "dup 0: $!";
+	open($lei->{1}, '>>&', \*STDOUT) or die "dup 1: $!";
+	open($lei->{2}, '>>&', \*STDERR) or die "dup 2: $!";
+	open($lei->{3}, '/') or die "open /: $!";
+	chdir($lei->{3}) or die "chdir /': $!";
+	my ($x, $y);
+	socketpair($x, $y, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
+	$lei->{sock} = $x;
+	require PublicInbox::LeiSelfSocket;
+	PublicInbox::LeiSelfSocket->new($y); # adds to event loop
+	$lei;
+}
+
+sub dir_idle_handler ($) { # PublicInbox::DirIdle callback
+	my ($ev) = @_; # Linux::Inotify2::Event or duck type
+	my $fn = $ev->fullname;
+	if ($fn =~ m!\A(.+)/(new|cur)/([^/]+)\z!) { # Maildir file
+		my ($mdir, $nc, $bn) = ($1, $2, $3);
+		$nc = '' if $ev->IN_DELETE;
+		for my $f (keys %{$MDIR2CFGPATH->{$mdir} // {}}) {
+			my $cfg = $PATH2CFG{$f} // next;
+			eval {
+				local %ENV = %{$cfg->{-env}};
+				my $lei = cfg2lei($cfg);
+				$lei->dispatch('note-event',
+						"maildir:$mdir", $nc, $bn, $fn);
+			};
+			warn "E note-event $f: $@\n" if $@;
+		}
+	}
+	if ($ev->can('cancel') && ($ev->IN_IGNORE || $ev->IN_UNMOUNT)) {
+		$ev->cancel;
+	}
+	if ($fn =~ m!\A(.+)/(?:new|cur)\z! && !-e $fn) {
+		delete $MDIR2CFGPATH->{$1};
+	}
+	if (!-e $fn) { # config file or Maildir gone
+		for my $cfgpaths (values %$MDIR2CFGPATH) {
+			delete $cfgpaths->{$fn};
+		}
+		delete $PATH2CFG{$fn};
+	}
+}
+
 # lei(1) calls this when it can't connect
 sub lazy_start {
 	my ($path, $errno, $narg) = @_;
@@ -1175,6 +1229,7 @@ sub lazy_start {
 	return if $pid;
 	$0 = "lei-daemon $path";
 	local %PATH2CFG;
+	local $MDIR2CFGPATH;
 	$listener->blocking(0);
 	my $exit_code;
 	my $pil = PublicInbox::Listener->new($listener, \&accept_dispatch);
@@ -1204,8 +1259,8 @@ sub lazy_start {
 	local $SIG{PIPE} = 'IGNORE';
 	require PublicInbox::DirIdle;
 	local $dir_idle = PublicInbox::DirIdle->new([$sock_dir], sub {
-		# just rely on wakeup ot hit PostLoopCallback set below
-		_dir_idle_handler(@_) if $_[0]->fullname ne $path;
+		# just rely on wakeup to hit PostLoopCallback set below
+		dir_idle_handler($_[0]) if $_[0]->fullname ne $path;
 	}, 1);
 	if ($sigfd) {
 		undef $sigfd; # unref, already in DS::DescriptorMap
@@ -1293,4 +1348,41 @@ sub wq_eof { # EOF callback for main daemon
 	$wq1->wq_wait_old(\&wq_done_wait, $lei);
 }
 
+sub watch_state_ok ($) {
+	my ($state) = $_[-1]; # $_[0] may be $self
+	$state =~ /\Apause|(?:import|index|tag)-(?:ro|rw)\z/;
+}
+
+sub refresh_watches {
+	my ($lei) = @_;
+	my $cfg = _lei_cfg($lei) or return;
+	$cfg->{-env} //= { %{$lei->{env}}, PWD => '/' }; # for cfg2lei
+	my $watches = $cfg->{-watches} //= {};
+	require PublicInbox::LeiWatch;
+	for my $w (grep(/\Awatch\..+\.state\z/, keys %$cfg)) {
+		my $url = substr($w, length('watch.'), -length('.state'));
+		my $lw = $watches->{$w} //= PublicInbox::LeiWatch->new($url);
+		my $state = $cfg->get_1("watch.$url", 'state');
+		if (!watch_state_ok($state)) {
+			$lei->err("watch.$url.state=$state not supported");
+			next;
+		}
+		my $f = $cfg->{'-f'};
+		if ($url =~ /\Amaildir:(.+)/i) {
+			my $d = File::Spec->canonpath($1);
+			if ($state eq 'pause') {
+				delete $MDIR2CFGPATH->{$d}->{$f};
+				scalar(keys %{$MDIR2CFGPATH->{$d}}) or
+					delete $MDIR2CFGPATH->{$d};
+			} elsif (!exists($MDIR2CFGPATH->{$d}->{$f})) {
+				$dir_idle->add_watches(["$d/cur", "$d/new"], 1);
+				$MDIR2CFGPATH->{$d}->{$f} = undef;
+			}
+		} else { # TODO: imap/nntp/jmap
+			$lei->child_error(1,
+				"E: watch $url not supported, yet");
+		}
+	}
+}
+
 1;
diff --git a/lib/PublicInbox/LeiAddWatch.pm b/lib/PublicInbox/LeiAddWatch.pm
new file mode 100644
index 00000000..671d54f9
--- /dev/null
+++ b/lib/PublicInbox/LeiAddWatch.pm
@@ -0,0 +1,41 @@
+# Copyright all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# "lei add-watch" command
+package PublicInbox::LeiAddWatch;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::LeiInput);
+
+sub lei_add_watch {
+	my ($lei, @argv) = @_;
+	my $cfg = $lei->_lei_cfg(1);
+	my $self = bless {}, __PACKAGE__;
+	$lei->{opt}->{'mail-sync'} = 1; # for prepare_inputs
+	my $state = $lei->{opt}->{'state'} // 'import-rw';
+	$lei->watch_state_ok($state) or
+		return $lei->fail("invalid state: $state");
+	my $vmd_mod = $self->vmd_mod_extract(\@argv);
+	return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
+	$self->prepare_inputs($lei, \@argv) or return;
+	my @vmd;
+	while (my ($type, $vals) = each %$vmd_mod) {
+		push @vmd, "$type:$_" for @$vals;
+	}
+	my $vmd0 = shift @vmd;
+	for my $w (@{$self->{inputs}}) {
+		# clobber existing, allow multiple
+		if (defined($vmd0)) {
+			$lei->_config("watch.$w.vmd", '--replace-all', $vmd0);
+			for my $v (@vmd) {
+				$lei->_config("watch.$w.vmd", $v);
+			}
+		}
+		next if defined $cfg->{"watch.$w.state"};
+		$lei->_config("watch.$w.state", $state);
+	}
+	delete $lei->{cfg}; # force reload
+	$lei->refresh_watches;
+}
+
+1;
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index de2a8ff1..fa330df5 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -372,6 +372,7 @@ sub input_only_net_merge_all_done {
 
 # like Getopt::Long, but for +kw:FOO and -kw:FOO to prepare
 # for update_xvmd -> update_vmd
+# returns something like { "+L" => [ @Labels ], ... }
 sub vmd_mod_extract {
 	my $argv = $_[-1];
 	my $vmd_mod = {};
diff --git a/lib/PublicInbox/LeiLsWatch.pm b/lib/PublicInbox/LeiLsWatch.pm
new file mode 100644
index 00000000..f96dc4ec
--- /dev/null
+++ b/lib/PublicInbox/LeiLsWatch.pm
@@ -0,0 +1,15 @@
+# Copyright all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+package PublicInbox::LeiLsWatch;
+use strict;
+use v5.10.1;
+
+sub lei_ls_watch {
+	my ($lei) = @_;
+	my $cfg = $lei->_lei_cfg or return;
+	my @w = (join("\n", keys %$cfg) =~ m/^watch\.(.+?)\.state$/sgm);
+	$lei->puts(join("\n", @w)) if @w;
+}
+
+1;
diff --git a/lib/PublicInbox/LeiNoteEvent.pm b/lib/PublicInbox/LeiNoteEvent.pm
new file mode 100644
index 00000000..bf15cd26
--- /dev/null
+++ b/lib/PublicInbox/LeiNoteEvent.pm
@@ -0,0 +1,124 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# internal command for dealing with inotify, kqueue vnodes, etc
+package PublicInbox::LeiNoteEvent;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+
+my $flush_timer;
+our $to_flush; # { cfgpath => $lei }
+
+sub flush_lei ($) {
+	my ($lei) = @_;
+	if (my $lne = delete $lei->{cfg}->{-lei_note_event}) {
+		$lne->wq_close(1, undef, $lei); # runs _lei_wq_eof;
+	} else { # lms_clear_src calls only:
+		my $wait = $lei->{sto}->ipc_do('done');
+	}
+}
+
+# we batch up writes and flush every 5s (matching Linux default
+# writeback behavior) since MUAs can trigger a storm of inotify events
+sub flush_task { # PublicInbox::DS timer callback
+	undef $flush_timer;
+	my $todo = $to_flush // return;
+	$to_flush = undef;
+	for my $lei (values %$todo) { flush_lei($lei) }
+}
+
+# sets a timer to flush
+sub note_event_arm_done ($) {
+	my ($lei) = @_;
+	$flush_timer //= PublicInbox::DS::add_timer(5, \&flush_task);
+	$to_flush->{$lei->{cfg}->{'-f'}} //= $lei;
+}
+
+sub eml_event ($$$$) {
+	my ($self, $eml, $kw, $state) = @_;
+	my $sto = $self->{lei}->{sto};
+	my $lse = $self->{lse} //= $sto->search;
+	my $vmd = { kw => $kw };
+	if ($state =~ /\Aimport-(?:rw|ro)\z/) {
+		$sto->ipc_do('set_eml', $eml, $vmd);
+	} elsif ($state =~ /\Aindex-(?:rw|ro)\z/) {
+		my $xoids = $self->{lei}->ale->xoids_for($eml);
+		$sto->ipc_do('index_eml_only', $eml, $vmd, $xoids);
+	} elsif ($state =~ /\Atag-(?:rw|ro)\z/) {
+		my $c = $lse->kw_changed($eml, $kw, my $docids = []);
+		if (scalar @$docids) { # already in lei/store
+			$sto->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c;
+		} elsif (my $xoids = $self->{lei}->ale->xoids_for($eml)) {
+			# it's in an external, only set kw, here
+			$sto->ipc_do('set_xvmd', $xoids, $eml, $vmd);
+		} # else { totally unknown
+	} else {
+		warn "unknown state: $state (in $self->{lei}->{cfg}->{'-f'})\n";
+	}
+}
+
+sub maildir_event { # via wq_io_do
+	my ($self, $fn, $kw, $state) = @_;
+	my $eml = PublicInbox::InboxWritable::eml_from_path($fn) // return;
+	eml_event($self, $eml, $kw, $state);
+}
+
+sub lei_note_event {
+	my ($lei, $folder, $new_cur, $bn, $fn, @rest) = @_;
+	die "BUG: unexpected: @rest" if @rest;
+	my $cfg = $lei->_lei_cfg or return; # gone (race)
+	my $sto = $lei->_lei_store or return; # gone
+	return flush_lei($lei) if $folder eq 'done'; # special case
+	my $lms = $sto->search->lms or return;
+	my $err = $lms->arg2folder($lei, [ $folder ]);
+	return if $err->{fail};
+	undef $lms;
+	my $state = $cfg->get_1("watch.$folder", 'state') // 'pause';
+	return if $state eq 'pause';
+	$lei->ale; # prepare
+	$sto->write_prepare($lei);
+	if ($new_cur eq '') {
+		$sto->ipc_do('lms_clear_src', $folder, \$bn);
+		return note_event_arm_done($lei);
+	}
+	require PublicInbox::MdirReader;
+	my $self = $cfg->{-lei_note_event} //= do {
+		my $wq = bless {}, __PACKAGE__;
+		# MUAs such as mutt can trigger massive rename() storms so
+		# use all CPU power available:
+		my $jobs = $wq->detect_nproc // 1;
+		my ($op_c, $ops) = $lei->workers_start($wq, $jobs);
+		$lei->wait_wq_events($op_c, $ops);
+		note_event_arm_done($lei);
+		$lei->{lne} = $wq;
+	};
+	if ($folder =~ /\Amaildir:/i) {
+		my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn)
+			// return;
+		return if index($fl, 'T') >= 0;
+		my $kw = PublicInbox::MdirReader::flags2kw($fl);
+		$self->wq_io_do('maildir_event', [], $fn, $kw, $state);
+	} # else: TODO: imap
+}
+
+sub ipc_atfork_child {
+	my ($self) = @_;
+	$self->{lei}->_lei_atfork_child(1); # persistent, for a while
+	$self->SUPER::ipc_atfork_child;
+}
+
+sub lne_done_wait {
+	my ($arg, $pid) = @_;
+	my ($self, $lei) = @$arg;
+	$lei->can('wq_done_wait')->($arg, $pid);
+}
+
+sub _lei_wq_eof { # EOF callback for main lei daemon
+	my ($lei) = @_;
+	my $lne = delete $lei->{lne} or return $lei->fail;
+	my $wait = $lei->{sto}->ipc_do('done');
+	$lne->wq_wait_old(\&lne_done_wait, $lei);
+}
+
+1;
diff --git a/lib/PublicInbox/LeiSearch.pm b/lib/PublicInbox/LeiSearch.pm
index 06ea6299..37bfc65e 100644
--- a/lib/PublicInbox/LeiSearch.pm
+++ b/lib/PublicInbox/LeiSearch.pm
@@ -5,7 +5,7 @@
 package PublicInbox::LeiSearch;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::ExtSearch);
+use parent qw(PublicInbox::ExtSearch); # PublicInbox::Search->reopen
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::ContentHash qw(content_digest content_hash);
 use PublicInbox::MID qw(mids mids_for_index);
diff --git a/lib/PublicInbox/LeiSelfSocket.pm b/lib/PublicInbox/LeiSelfSocket.pm
new file mode 100644
index 00000000..3d847649
--- /dev/null
+++ b/lib/PublicInbox/LeiSelfSocket.pm
@@ -0,0 +1,45 @@
+# Copyright all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# dummy placeholder socket for internal lei commands.
+# This receives what script/lei receives, but isn't connected
+# to an interactive terminal so I'm not sure what to do with it...
+package PublicInbox::LeiSelfSocket;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::DS);
+use Data::Dumper;
+$Data::Dumper::Useqq = 1; # should've been the Perl default :P
+use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
+use PublicInbox::Spawn;
+my $recv_cmd;
+
+sub new {
+	my ($cls, $r) = @_;
+	my $self = bless { sock => $r }, $cls;
+	$r->blocking(0);
+	no warnings 'once';
+	$recv_cmd = $PublicInbox::LEI::recv_cmd;
+	$self->SUPER::new($r, EPOLLIN|EPOLLET);
+}
+
+sub event_step {
+	my ($self) = @_;
+	while (1) {
+		my (@fds) = $recv_cmd->($self->{sock}, my $buf, 4096 * 33);
+		if (scalar(@fds) == 1 && !defined($fds[0])) {
+			return if $!{EAGAIN};
+			next if $!{EINTR};
+			die "recvmsg: $!";
+		}
+		# open so perl can auto-close them:
+		for my $fd (@fds) {
+			open(my $newfh, '+<&=', $fd) or die "open +<&=$fd: $!";
+		}
+		return $self->close if $buf eq '';
+		warn Dumper({ 'unexpected self msg' => $buf, fds => \@fds });
+		# TODO: figure out what to do with these messages...
+	}
+}
+
+1;
diff --git a/lib/PublicInbox/LeiWatch.pm b/lib/PublicInbox/LeiWatch.pm
new file mode 100644
index 00000000..35267b58
--- /dev/null
+++ b/lib/PublicInbox/LeiWatch.pm
@@ -0,0 +1,13 @@
+# Copyright all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# represents a Maildir or IMAP "watch" item
+package PublicInbox::LeiWatch;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+
+# "url" may be something like "maildir:/path/to/dir"
+sub new { bless { url => $_[1] }, $_[0] }
+
+1;
diff --git a/t/lei-watch.t b/t/lei-watch.t
new file mode 100644
index 00000000..3a2f9e64
--- /dev/null
+++ b/t/lei-watch.t
@@ -0,0 +1,49 @@
+#!perl -w
+# Copyright all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict; use v5.10.1; use PublicInbox::TestCommon;
+use File::Path qw(make_path);
+require_mods('lei');
+my $have_fast_inotify = eval { require Linux::Inotify2 } ||
+	eval { require IO::KQueue };
+
+$have_fast_inotify or
+	diag("$0 IO::KQueue or Linux::Inotify2 missing, test will be slow");
+
+my ($ro_home, $cfg_path) = setup_public_inboxes;
+test_lei(sub {
+	my $md = "$ENV{HOME}/md";
+	my $md2 = $md.'2';
+	lei_ok 'ls-watch';
+	is($lei_out, '', 'nothing in ls-watch, yet');
+	if (0) { # TODO
+		my $url = 'imaps://example.com/foo.bar.0';
+		lei_ok([qw(add-watch --state=pause), $url], undef, {});
+		lei_ok 'ls-watch';
+		is($lei_out, "$url\n", 'ls-watch shows added watch');
+		ok(!lei(qw(add-watch --state=pause), 'bogus'.$url),
+			'bogus URL rejected');
+	}
+
+	# first, make sure tag-ro works
+	make_path("$md/new", "$md/cur", "$md/tmp");
+	lei_ok qw(add-watch --state=tag-ro), $md;
+	lei_ok 'ls-watch';
+	like($lei_out, qr/^\Qmaildir:$md\E$/sm, 'maildir shown');
+	lei_ok qw(q mid:testmessage@example.com -o), $md, '-I', "$ro_home/t1";
+	my @f = glob("$md/cur/*:2,");
+	is(scalar(@f), 1, 'got populated maildir with one result');
+	rename($f[0], "$f[0]S") or xbail "rename $!"; # set (S)een
+	$have_fast_inotify or tick(2);
+	lei_ok qw(note-event done); # flushes immediately (instead of 5s)
+
+	lei_ok qw(q mid:testmessage@example.com -o), $md2, '-I', "$ro_home/t1";
+	my @f2 = glob("$md2/*/*");
+	is(scalar(@f2), 1, 'got one result');
+	like($f2[0], qr/S\z/, 'seen set from rename');
+	my $e2 = eml_load($f2[0]);
+	my $e1 = eml_load("$f[0]S");
+	is_deeply($e2, $e1, 'results match');
+});
+
+done_testing;

^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH 3/2] lei: auto-refresh watches in config, cancel missing
  2021-07-19  8:59 [PATCH 0/2] lei Maildir inotify/kevent support Eric Wong
  2021-07-19  8:59 ` [PATCH 1/2] config: s/_one_val/get_1/ for public use Eric Wong
  2021-07-19  8:59 ` [PATCH 2/2] lei: start implementing inotify Maildir support Eric Wong
@ 2021-07-21 14:07 ` Eric Wong
  2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2021-07-21 14:07 UTC (permalink / raw)
  To: meta

This makes behavior less surprising on restarts as we no longer
lose state on restarts, so there's no need to manually run "lei
add-watch" to re-enable watches.  This also allows us to
transparently handle changes if somebody edits the lei config
file directly or via git-config(1).
---
 lib/PublicInbox/DirIdle.pm      |  5 +++-
 lib/PublicInbox/LEI.pm          | 50 +++++++++++++++++++++++++--------
 lib/PublicInbox/LeiNoteEvent.pm |  2 +-
 t/lei-watch.t                   | 39 +++++++++++++++++++++++++
 4 files changed, 83 insertions(+), 13 deletions(-)

diff --git a/lib/PublicInbox/DirIdle.pm b/lib/PublicInbox/DirIdle.pm
index 7031e5fd..65896f95 100644
--- a/lib/PublicInbox/DirIdle.pm
+++ b/lib/PublicInbox/DirIdle.pm
@@ -56,10 +56,13 @@ sub new {
 sub add_watches {
 	my ($self, $dirs, $gone) = @_;
 	my $fl = $MAIL_IN | ($gone ? $MAIL_GONE : 0);
+	my @ret;
 	for my $d (@$dirs) {
-		$self->{inot}->watch($d, $fl);
+		my $w = $self->{inot}->watch($d, $fl) or next;
+		push @ret, $w;
 	}
 	PublicInbox::FakeInotify::poll_once($self) if !$ino_cls;
+	@ret
 }
 
 sub rm_watches {
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index b92d7512..52c551cf 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -36,7 +36,7 @@ my $GLP_PASS = Getopt::Long::Parser->new;
 $GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through));
 
 our %PATH2CFG; # persistent for socket daemon
-our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => undef }
+our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => [ ino watches ] }
 
 # TBD: this is a documentation mechanism to show a subcommand
 # (may) pass options through to another command:
@@ -820,6 +820,8 @@ sub _lei_cfg ($;$) {
 		}
 	}
 	$self->{cfg} = $PATH2CFG{$f} = $cfg;
+	refresh_watches($self);
+	$cfg;
 }
 
 sub _lei_store ($;$) {
@@ -1353,36 +1355,62 @@ sub watch_state_ok ($) {
 	$state =~ /\Apause|(?:import|index|tag)-(?:ro|rw)\z/;
 }
 
+sub cancel_maildir_watch ($$) {
+	my ($d, $cfg_f) = @_;
+	my $w = delete $MDIR2CFGPATH->{$d}->{$cfg_f};
+	scalar(keys %{$MDIR2CFGPATH->{$d}}) or
+		delete $MDIR2CFGPATH->{$d};
+	for my $x (@{$w // []}) { $x->cancel }
+}
+
 sub refresh_watches {
 	my ($lei) = @_;
 	my $cfg = _lei_cfg($lei) or return;
-	$cfg->{-env} //= { %{$lei->{env}}, PWD => '/' }; # for cfg2lei
+	my $old = $cfg->{-watches};
 	my $watches = $cfg->{-watches} //= {};
-	require PublicInbox::LeiWatch;
+	my %seen;
+	my $cfg_f = $cfg->{'-f'};
 	for my $w (grep(/\Awatch\..+\.state\z/, keys %$cfg)) {
 		my $url = substr($w, length('watch.'), -length('.state'));
-		my $lw = $watches->{$w} //= PublicInbox::LeiWatch->new($url);
+		require PublicInbox::LeiWatch;
+		my $lw = $watches->{$url} //= PublicInbox::LeiWatch->new($url);
+		$seen{$url} = undef;
 		my $state = $cfg->get_1("watch.$url", 'state');
 		if (!watch_state_ok($state)) {
 			$lei->err("watch.$url.state=$state not supported");
 			next;
 		}
-		my $f = $cfg->{'-f'};
 		if ($url =~ /\Amaildir:(.+)/i) {
 			my $d = File::Spec->canonpath($1);
 			if ($state eq 'pause') {
-				delete $MDIR2CFGPATH->{$d}->{$f};
-				scalar(keys %{$MDIR2CFGPATH->{$d}}) or
-					delete $MDIR2CFGPATH->{$d};
-			} elsif (!exists($MDIR2CFGPATH->{$d}->{$f})) {
-				$dir_idle->add_watches(["$d/cur", "$d/new"], 1);
-				$MDIR2CFGPATH->{$d}->{$f} = undef;
+				cancel_maildir_watch($d, $cfg_f);
+			} elsif (!exists($MDIR2CFGPATH->{$d}->{$cfg_f})) {
+				my @w = $dir_idle->add_watches(
+						["$d/cur", "$d/new"], 1);
+				push @{$MDIR2CFGPATH->{$d}->{$cfg_f}}, @w if @w;
 			}
 		} else { # TODO: imap/nntp/jmap
 			$lei->child_error(1,
 				"E: watch $url not supported, yet");
 		}
 	}
+	if ($old) { # cull old non-existent entries
+		for my $url (keys %$old) {
+			next if exists $seen{$url};
+			delete $old->{$url};
+			if ($url =~ /\Amaildir:(.+)/i) {
+				my $d = File::Spec->canonpath($1);
+				cancel_maildir_watch($d, $cfg_f);
+			} else { # TODO: imap/nntp/jmap
+				$lei->child_error(1, "E: watch $url TODO");
+			}
+		}
+	}
+	if (scalar keys %$watches) {
+		$cfg->{-env} //= { %{$lei->{env}}, PWD => '/' }; # for cfg2lei
+	} else {
+		delete $cfg->{-watches};
+	}
 }
 
 1;
diff --git a/lib/PublicInbox/LeiNoteEvent.pm b/lib/PublicInbox/LeiNoteEvent.pm
index bf15cd26..d6511cf6 100644
--- a/lib/PublicInbox/LeiNoteEvent.pm
+++ b/lib/PublicInbox/LeiNoteEvent.pm
@@ -14,7 +14,7 @@ sub flush_lei ($) {
 	my ($lei) = @_;
 	if (my $lne = delete $lei->{cfg}->{-lei_note_event}) {
 		$lne->wq_close(1, undef, $lei); # runs _lei_wq_eof;
-	} else { # lms_clear_src calls only:
+	} elsif ($lei->{sto}) { # lms_clear_src calls only:
 		my $wait = $lei->{sto}->ipc_do('done');
 	}
 }
diff --git a/t/lei-watch.t b/t/lei-watch.t
index 3a2f9e64..492f6c1d 100644
--- a/t/lei-watch.t
+++ b/t/lei-watch.t
@@ -13,9 +13,27 @@ $have_fast_inotify or
 my ($ro_home, $cfg_path) = setup_public_inboxes;
 test_lei(sub {
 	my $md = "$ENV{HOME}/md";
+	my $cfg_f = "$ENV{HOME}/.config/lei/config";
 	my $md2 = $md.'2';
 	lei_ok 'ls-watch';
 	is($lei_out, '', 'nothing in ls-watch, yet');
+
+	my ($ino_fdinfo, $ino_contents);
+	SKIP: {
+		$have_fast_inotify && $^O eq 'linux' or
+			skip 'Linux/inotify-only internals check', 1;
+		lei_ok 'daemon-pid'; chomp(my $pid = $lei_out);
+		skip 'missing /proc/$PID/fd', 1 if !-d "/proc/$pid/fd";
+		my @ino = grep {
+			readlink($_) =~ /\binotify\b/
+		} glob("/proc/$pid/fd/*");
+		is(scalar(@ino), 1, 'only one inotify FD');
+		my $ino_fd = (split('/', $ino[0]))[-1];
+		$ino_fdinfo = "/proc/$pid/fdinfo/$ino_fd";
+		open my $fh, '<', $ino_fdinfo or xbail "open $ino_fdinfo: $!";
+		$ino_contents = [ <$fh> ];
+	}
+
 	if (0) { # TODO
 		my $url = 'imaps://example.com/foo.bar.0';
 		lei_ok([qw(add-watch --state=pause), $url], undef, {});
@@ -44,6 +62,27 @@ test_lei(sub {
 	my $e2 = eml_load($f2[0]);
 	my $e1 = eml_load("$f[0]S");
 	is_deeply($e2, $e1, 'results match');
+
+	SKIP: {
+		$ino_fdinfo or skip 'Linux/inotify-only watch check', 1;
+		open my $fh, '<', $ino_fdinfo or xbail "open $ino_fdinfo: $!";
+		my $cmp = [ <$fh> ];
+		ok(scalar(@$cmp) > scalar(@$ino_contents),
+			'inotify has Maildir watches');
+	}
+
+	is(xsys(qw(git config -f), $cfg_f,
+			'--remove-section', "watch.maildir:$md"),
+		0, 'unset config state');
+	lei_ok 'ls-watch', \'refresh watches';
+	is($lei_out, '', 'no watches left');
+
+	SKIP: {
+		$ino_fdinfo or skip 'Linux/inotify-only removal removal', 1;
+		open my $fh, '<', $ino_fdinfo or xbail "open $ino_fdinfo: $!";
+		my $cmp = [ <$fh> ];
+		is_deeply($cmp, $ino_contents, 'inotify Maildir watches gone');
+	};
 });
 
 done_testing;

^ permalink raw reply related	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2021-07-21 14:07 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-07-19  8:59 [PATCH 0/2] lei Maildir inotify/kevent support Eric Wong
2021-07-19  8:59 ` [PATCH 1/2] config: s/_one_val/get_1/ for public use Eric Wong
2021-07-19  8:59 ` [PATCH 2/2] lei: start implementing inotify Maildir support Eric Wong
2021-07-21 14:07 ` [PATCH 3/2] lei: auto-refresh watches in config, cancel missing Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).