From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 270ED1FA13 for ; Tue, 15 Dec 2020 11:47:23 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [RFC 3/7] lei: FD-passing and IPC basics Date: Tue, 15 Dec 2020 11:47:18 +0000 Message-Id: <20201215114722.27400-4-e@80x24.org> In-Reply-To: <20201215114722.27400-1-e@80x24.org> References: <20201215114722.27400-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: The start of lei, a Local Email Interface. It'll support a daemon via FD passing to avoid startup time penalties if IO::FDPass is installed, but fall back to a slow one-shot mode if not. Compared to traditional socket daemon, FD passing should allow us to eventually do stuff like run "git show" and still have proper terminal support for pager and color. --- MANIFEST | 3 + lib/PublicInbox/Daemon.pm | 6 +- lib/PublicInbox/LeiDaemon.pm | 303 +++++++++++++++++++++++++++++++++++ script/lei | 58 +++++++ t/lei.t | 80 +++++++++ 5 files changed, 448 insertions(+), 2 deletions(-) create mode 100644 lib/PublicInbox/LeiDaemon.pm create mode 100755 script/lei create mode 100644 t/lei.t diff --git a/MANIFEST b/MANIFEST index ac442606..7536b7c2 100644 --- a/MANIFEST +++ b/MANIFEST @@ -159,6 +159,7 @@ lib/PublicInbox/InboxIdle.pm lib/PublicInbox/InboxWritable.pm lib/PublicInbox/Isearch.pm lib/PublicInbox/KQNotify.pm +lib/PublicInbox/LeiDaemon.pm lib/PublicInbox/Linkify.pm lib/PublicInbox/Listener.pm lib/PublicInbox/Lock.pm @@ -226,6 +227,7 @@ sa_config/Makefile sa_config/README sa_config/root/etc/spamassassin/public-inbox.pre sa_config/user/.spamassassin/user_prefs +script/lei script/public-inbox-compact script/public-inbox-convert script/public-inbox-edit @@ -316,6 +318,7 @@ t/indexlevels-mirror.t t/init.t t/iso-2202-jp.eml t/kqnotify.t +t/lei.t t/linkify.t t/main-bin/spamc t/mda-mime.eml diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index a2171535..6b92b60d 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -1,7 +1,9 @@ # Copyright (C) 2015-2020 all contributors # License: AGPL-3.0+ -# contains common daemon code for the httpd, imapd, and nntpd servers. -# This may be used for read-only IMAP server if we decide to implement it. +# +# Contains common daemon code for the httpd, imapd, and nntpd servers +# and designed for handling thousands of untrusted clients over slow +# and/or lossy connections. package PublicInbox::Daemon; use strict; use warnings; diff --git a/lib/PublicInbox/LeiDaemon.pm b/lib/PublicInbox/LeiDaemon.pm new file mode 100644 index 00000000..ae40b3a6 --- /dev/null +++ b/lib/PublicInbox/LeiDaemon.pm @@ -0,0 +1,303 @@ +# Copyright (C) 2020 all contributors +# License: AGPL-3.0+ + +# Backend for `lei' (local email interface). Unlike the C10K-oriented +# PublicInbox::Daemon, this is designed exclusively to handle trusted +# local clients with read/write access to the FS and use as many +# system resources as the local user has access to. +package PublicInbox::LeiDaemon; +use strict; +use v5.10.1; +use parent qw(PublicInbox::DS); +use Getopt::Long (); +use Errno qw(EAGAIN ECONNREFUSED ENOENT); +use POSIX qw(setsid); +use IO::Socket::UNIX; +use IO::Handle (); +use Sys::Syslog qw(syslog openlog); +use PublicInbox::Syscall qw($SFD_NONBLOCK EPOLLIN EPOLLONESHOT); +use PublicInbox::Sigfd; +use PublicInbox::DS qw(now); +use PublicInbox::Spawn qw(spawn); +our $quit = sub { exit(shift // 0) }; +my $glp = Getopt::Long::Parser->new; +$glp->configure(qw(gnu_getopt no_ignore_case auto_abbrev)); + +sub x_it ($$) { # pronounced "exit" + my ($client, $code) = @_; + if (my $sig = ($code & 127)) { + kill($sig, $client->{pid} // $$); + } else { + $code >>= 8; + if (my $sock = $client->{sock}) { + say $sock "exit=$code"; + } else { # for oneshot + $quit->($code); + } + } +} + +sub emit ($$$) { + my ($client, $channel, $buf) = @_; + print { $client->{$channel} } $buf or warn "print FD[$channel]: $!"; +} + +sub fail ($$;$) { + my ($client, $buf, $exit_code) = @_; + $buf .= "\n" unless $buf =~ /\n\z/s; + emit($client, 2, $buf); + x_it($client, ($exit_code // 1) << 8); + undef; +} + +sub _help ($;$) { + my ($client, $channel) = @_; + emit($client, $channel //= 1, < failure +} + +sub assert_args ($$$;$@) { + my ($client, $argv, $proto, $opt, @spec) = @_; + $opt //= {}; + push @spec, qw(help|h); + $glp->getoptionsfromarray($argv, $opt, @spec) or + return fail($client, 'bad arguments or options'); + if ($opt->{help}) { + _help($client); + undef; + } else { + my ($nreq, $rest) = split(/;/, $proto); + $nreq = (($nreq // '') =~ tr/$/$/); + my $argc = scalar(@$argv); + my $tot = ($rest // '') eq '@' ? $argc : ($proto =~ tr/$/$/); + return 1 if $argc <= $tot && $argc >= $nreq; + _help($client, 2); + undef + } +} + +sub dispatch { + my ($client, $cmd, @argv) = @_; + local $SIG{__WARN__} = sub { emit($client, 2, "@_") }; + local $SIG{__DIE__} = 'DEFAULT'; + if (defined $cmd) { + my $func = "lei_$cmd"; + $func =~ tr/-/_/; + if (my $cb = __PACKAGE__->can($func)) { + $client->{cmd} = $cmd; + $cb->($client, \@argv); + } elsif (grep(/\A-/, $cmd, @argv)) { + assert_args($client, [ $cmd, @argv ], ''); + } else { + fail($client, "`$cmd' is not an lei command"); + } + } else { + _help($client, 2); + } +} + +sub lei_daemon_pid { + my ($client, $argv) = @_; + assert_args($client, $argv, '') and emit($client, 1, "$$\n"); +} + +sub lei_DBG_pwd { + my ($client, $argv) = @_; + assert_args($client, $argv, '') and + emit($client, 1, "$client->{env}->{PWD}\n"); +} + +sub lei_DBG_cwd { + my ($client, $argv) = @_; + require Cwd; + assert_args($client, $argv, '') and emit($client, 1, Cwd::cwd()."\n"); +} + +sub lei_DBG_false { x_it($_[0], 1 << 8) } + +sub lei_daemon_stop { + my ($client, $argv) = @_; + assert_args($client, $argv, '') and $quit->(0); +} + +sub lei_help { _help($_[0]) } + +sub reap_exec { # dwaitpid callback + my ($client, $pid) = @_; + x_it($client, $?); +} + +sub lei_git { # support passing through random git commands + my ($client, $argv) = @_; + my %opt = map { $_ => $client->{$_} } (0..2); + my $pid = spawn(['git', @$argv], $client->{env}, \%opt); + PublicInbox::DS::dwaitpid($pid, \&reap_exec, $client); +} + +sub accept_dispatch { # Listener {post_accept} callback + my ($sock) = @_; # ignore other + $sock->blocking(1); + $sock->autoflush(1); + my $client = { sock => $sock }; + vec(my $rin = '', fileno($sock), 1) = 1; + # `say $sock' triggers "die" in lei(1) + for my $i (0..2) { + if (select(my $rout = $rin, undef, undef, 1)) { + my $fd = IO::FDPass::recv(fileno($sock)); + if ($fd >= 0) { + my $rdr = ($fd == 0 ? '<&=' : '>&='); + if (open(my $fh, $rdr, $fd)) { + $client->{$i} = $fh; + } else { + say $sock "open($rdr$fd) (FD=$i): $!"; + return; + } + } else { + say $sock "recv FD=$i: $!"; + return; + } + } else { + say $sock "timed out waiting to recv FD=$i"; + return; + } + } + # $ARGV_STR = join("]\0[", @ARGV); + # $ENV_STR = join('', map { "$_=$ENV{$_}\0" } keys %ENV); + # $line = "$$\0\0>$ARGV_STR\0\0>$ENV_STR\0\0"; + my ($client_pid, $argv, $env) = do { + local $/ = "\0\0\0"; # yes, 3 NULs at EOL, not 2 + chomp(my $line = <$sock>); + split(/\0\0>/, $line, 3); + }; + my %env = map { split(/=/, $_, 2) } split(/\0/, $env); + if (chdir($env{PWD})) { + $client->{env} = \%env; + $client->{pid} = $client_pid; + eval { dispatch($client, split(/\]\0\[/, $argv)) }; + say $sock $@ if $@; + } else { + say $sock "chdir($env{PWD}): $!"; # implicit close + } +} + +sub noop {} + +# lei(1) calls this when it can't connect +sub lazy_start ($$) { + my ($path, $err) = @_; + if ($err == ECONNREFUSED) { + unlink($path) or die "unlink($path): $!"; + } elsif ($err != ENOENT) { + die "connect($path): $!"; + } + my $umask = umask(077) // die("umask(077): $!"); + my $l = IO::Socket::UNIX->new(Local => $path, + Listen => 1024, + Type => SOCK_STREAM) or + $err = $!; + umask($umask) or die("umask(restore): $!"); + $l or return $err; + my @st = stat($path) or die "stat($path): $!"; + my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino + pipe(my ($eof_r, $eof_w)) or die "pipe: $!"; + my $oldset = PublicInbox::Sigfd::block_signals(); + my $pid = fork // die "fork: $!"; + if ($pid) { + PublicInbox::Sigfd::sig_setmask($oldset); + return; # client will connect to $path + } + openlog($path, 'pid', 'user'); + local $SIG{__DIE__} = sub { + syslog('crit', "@_"); + exit $! if $!; + exit $? >> 8 if $? >> 8; + exit 255; + }; + local $SIG{__WARN__} = sub { syslog('warning', "@_") }; + open(STDIN, '+<', '/dev/null') or die "redirect stdin failed: $!\n"; + open STDOUT, '>&STDIN' or die "redirect stdout failed: $!\n"; + open STDERR, '>&STDIN' or die "redirect stderr failed: $!\n"; + setsid(); + $pid = fork // die "fork: $!"; + exit if $pid; + $0 = "lei-daemon $path"; + require PublicInbox::Listener; + require PublicInbox::EOFpipe; + $l->blocking(0); + $eof_w->blocking(0); + $eof_r->blocking(0); + my $listener = PublicInbox::Listener->new($l, \&accept_dispatch, $l); + my $exit_code; + local $quit = sub { + $exit_code //= shift; + my $tmp = $listener or exit($exit_code); + unlink($path) if defined($path); + syswrite($eof_w, '.'); + $l = $listener = $path = undef; + $tmp->close if $tmp; # DS::close + PublicInbox::DS->SetLoopTimeout(1000); + }; + PublicInbox::EOFpipe->new($eof_r, sub {}, undef); + my $sig = { + CHLD => \&PublicInbox::DS::enqueue_reap, + QUIT => $quit, + INT => $quit, + TERM => $quit, + HUP => \&noop, + USR1 => \&noop, + USR2 => \&noop, + }; + my $sigfd = PublicInbox::Sigfd->new($sig, $SFD_NONBLOCK); + local %SIG = (%SIG, %$sig) if !$sigfd; + if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets + PublicInbox::DS->SetLoopTimeout(5000); + } else { + # wake up every second to accept signals if we don't + # have signalfd or IO::KQueue: + PublicInbox::Sigfd::sig_setmask($oldset); + PublicInbox::DS->SetLoopTimeout(1000); + } + PublicInbox::DS->SetPostLoopCallback(sub { + my ($dmap, undef) = @_; + if (@st = defined($path) ? stat($path) : ()) { + if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) { + warn "$path dev/ino changed, quitting\n"; + $path = undef; + } + } elsif (defined($path)) { + warn "stat($path): $!, quitting ...\n"; + undef $path; # don't unlink + $quit->(); + } + return 1 if defined($path); + my $now = now(); + my $n = 0; + for my $s (values %$dmap) { + $s->can('busy') or next; + if ($s->busy($now)) { + ++$n; + } else { + $s->close; + } + } + $n; # true: continue, false: stop + }); + PublicInbox::DS->EventLoop; + exit($exit_code // 0); +} + +# for users w/o IO::FDPass +sub oneshot { + dispatch({ + 0 => *STDIN{IO}, + 1 => *STDOUT{IO}, + 2 => *STDERR{IO}, + env => \%ENV + }, @ARGV); +} + +1; diff --git a/script/lei b/script/lei new file mode 100755 index 00000000..1b5af3a1 --- /dev/null +++ b/script/lei @@ -0,0 +1,58 @@ +#!perl -w +# Copyright (C) 2020 all contributors +# License: AGPL-3.0+ +use strict; +use v5.10.1; +use Cwd qw(cwd); +use IO::Socket::UNIX; + +if (eval { require IO::FDPass; 1 }) { # use daemon to reduce load time + my $path = do { + my $runtime_dir = ($ENV{XDG_RUNTIME_DIR} // '') . '/lei'; + if ($runtime_dir eq '/lei') { + require File::Spec; + $runtime_dir = File::Spec->tmpdir."/lei-$<"; + } + unless (-d $runtime_dir && -w _) { + require File::Path; + File::Path::mkpath($runtime_dir, 0, 0700); + } + "$runtime_dir/sock"; + }; + my $sock = IO::Socket::UNIX->new(Peer => $path, Type => SOCK_STREAM); + unless ($sock) { # start the daemon if not started + my $err = $!; + require PublicInbox::LeiDaemon; + $err = PublicInbox::LeiDaemon::lazy_start($path, $err); + # try connecting again anyways, unlink+bind may be racy + $sock = IO::Socket::UNIX->new(Peer => $path, + Type => SOCK_STREAM) // die + "connect($path): $! (bind($path): $err)"; + } + my $pwd = $ENV{PWD}; + my $cwd = cwd(); + if ($pwd) { # prefer ENV{PWD} if it's a symlink to real cwd + my @st_cwd = stat($cwd) or die "stat(cwd=$cwd): $!\n"; + my @st_pwd = stat($pwd); + # make sure st_dev/st_ino match for {PWD} to be valid + $pwd = $cwd if (!@st_pwd || $st_pwd[1] != $st_cwd[1] || + $st_pwd[0] != $st_cwd[0]); + } else { + $pwd = $cwd; + } + local $ENV{PWD} = $pwd; + $sock->autoflush(1); + IO::FDPass::send(fileno($sock), $_) for (0..2); + my $buf = "$$\0\0>" . join("]\0[", @ARGV) . "\0\0>"; + while (my ($k, $v) = each %ENV) { $buf .= "$k=$v\0" } + $buf .= "\0\0"; + print $sock $buf or die "print(sock, buf): $!"; + local $/ = "\n"; + while (my $line = <$sock>) { + $line =~ /\Aexit=([0-9]+)\n\z/ and exit($1 + 0); + die $line; + } +} else { # for systems lacking IO::FDPass + require PublicInbox::LeiDaemon; + PublicInbox::LeiDaemon::oneshot(); +} diff --git a/t/lei.t b/t/lei.t new file mode 100644 index 00000000..feee9270 --- /dev/null +++ b/t/lei.t @@ -0,0 +1,80 @@ +#!perl -w +# Copyright (C) 2020 all contributors +# License: AGPL-3.0+ +use strict; +use v5.10.1; +use Test::More; +use PublicInbox::TestCommon; +use PublicInbox::Config; +my $json = PublicInbox::Config::json() or plan skip_all => 'JSON missing'; +require_mods(qw(DBD::SQLite Search::Xapian)); +my ($home, $for_destroy) = tmpdir(); +my $opt = { 1 => \(my $out = ''), 2 => \(my $err = '') }; + +SKIP: { + require_mods('IO::FDPass', 51); + local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run"; + mkdir "$home/xdg_run", 0700 or BAIL_OUT "mkdir: $!"; + my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/sock"; + + ok(run_script([qw(lei daemon-pid)], undef, $opt), 'daemon-pid'); + is($err, '', 'no error from daemon-pid'); + like($out, qr/\A[0-9]+\n\z/s, 'pid returned') or BAIL_OUT; + chomp(my $pid = $out); + ok(kill(0, $pid), 'pid is valid'); + ok(-S $sock, 'sock created'); + + ok(!run_script([qw(lei)], undef, $opt), 'no args fails'); + is($? >> 8, 1, '$? is 1'); + is($out, '', 'nothing in stdout'); + like($err, qr/^usage:/sm, 'usage in stderr'); + + for my $arg (['-h'], ['--help'], ['help'], [qw(daemon-pid --help)]) { + $out = $err = ''; + ok(run_script(['lei', @$arg], undef, $opt), "lei @$arg"); + like($out, qr/^usage:/sm, "usage in stdout (@$arg)"); + is($err, '', "nothing in stderr (@$arg)"); + } + + ok(!run_script([qw(lei DBG-false)], undef, $opt), 'false(1) emulation'); + is($? >> 8, 1, '$? set correctly'); + is($err, '', 'no error from false(1) emulation'); + + for my $arg ([''], ['--halp'], ['halp'], [qw(daemon-pid --halp)]) { + $out = $err = ''; + ok(!run_script(['lei', @$arg], undef, $opt), "lei @$arg"); + is($? >> 8, 1, '$? set correctly'); + isnt($err, '', 'something in stderr'); + is($out, '', 'nothing in stdout'); + } + + $out = ''; + ok(run_script([qw(lei daemon-pid)], undef, $opt), 'daemon-pid'); + chomp(my $pid_again = $out); + is($pid, $pid_again, 'daemon-pid idempotent'); + + ok(run_script([qw(lei daemon-stop)], undef, $opt), 'daemon-stop'); + is($out, '', 'no output from daemon-stop'); + is($err, '', 'no error from daemon-stop'); + for (0..100) { + kill(0, $pid) or last; + tick(); + } + ok(!-S $sock, 'sock gone'); + ok(!kill(0, $pid), 'pid gone after stop'); + + ok(run_script([qw(lei daemon-pid)], undef, $opt), 'daemon-pid'); + chomp(my $new_pid = $out); + ok(kill(0, $new_pid), 'new pid is running'); + ok(-S $sock, 'sock exists again'); + unlink $sock or BAIL_OUT "unlink $!"; + for (0..100) { + kill('CHLD', $new_pid) or last; + tick(); + } + ok(!kill(0, $new_pid), 'daemon exits after unlink'); +}; + +require_ok 'PublicInbox::LeiDaemon'; + +done_testing;