* [PATCH 01/21] lei: drop stores explicitly at daemon shutdown
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 02/21] ds: hoist out close_non_busy Eric Wong
` (19 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
This will allow us to avoid unblocking signals during
shutdown to simplify our code.
---
lib/PublicInbox/DS.pm | 3 +--
lib/PublicInbox/LEI.pm | 13 ++++++++++++-
2 files changed, 13 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 142122a8..c476311b 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -38,10 +38,9 @@ our @EXPORT_OK = qw(now msg_more awaitpid add_timer add_uniq_timer);
my %Stack;
my $nextq; # queue for next_tick
-my $AWAIT_PIDS; # pid => [ $callback, @args ]
my $reap_armed;
my $ToClose; # sockets to close when event loop is done
-our (
+our ($AWAIT_PIDS, # pid => [ $callback, @args ]
%DescriptorMap, # fd (num) -> PublicInbox::DS object
$Poller, # global Select, Epoll, DSPoll, or DSKQXS ref
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 10c08b90..368eee26 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1271,6 +1271,15 @@ sub dir_idle_handler ($) { # PublicInbox::DirIdle callback
}
}
+sub drop_all_stores () {
+ for my $cfg (values %PATH2CFG) {
+ my $sto = delete($cfg->{-lei_store}) // next;
+ eval { $sto->wq_io_do('done') };
+ warn "E: $@ (dropping store for $cfg->{-f})" if $@;
+ $sto->wq_close;
+ }
+}
+
# lei(1) calls this when it can't connect
sub lazy_start {
my ($path, $errno, $narg) = @_;
@@ -1367,7 +1376,9 @@ sub lazy_start {
$s->close;
}
}
- $n; # true: continue, false: stop
+ drop_all_stores() if !$n; # drop stores only if no clients
+ # returns true: continue, false: stop
+ $n + scalar(keys(%$PublicInbox::DS::AWAIT_PIDS));
});
# STDIN was redirected to /dev/null above, closing STDERR and
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 02/21] ds: hoist out close_non_busy
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
2023-10-04 3:49 ` [PATCH 01/21] lei: drop stores explicitly at daemon shutdown Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 03/21] ds: don't pass FD map to post_loop_do callback Eric Wong
` (18 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
It's shared by both by lei and public-facing daemons in using
the ->busy callback.
---
lib/PublicInbox/DS.pm | 10 ++++++++++
lib/PublicInbox/Daemon.pm | 10 +---------
lib/PublicInbox/LEI.pm | 12 ++----------
3 files changed, 13 insertions(+), 19 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index c476311b..ecfb581d 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -233,6 +233,16 @@ sub enqueue_reap () { $reap_armed //= requeue(\&reap_pids) }
sub in_loop () { $in_loop }
+# use inside @post_loop_do, returns number of busy clients
+sub close_non_busy () {
+ my $n = 0;
+ for my $s (values %DescriptorMap) {
+ # close as much as possible, early as possible
+ ($s->busy ? ++$n : $s->close) if $s->can('busy');
+ }
+ $n;
+}
+
# Internal function: run the post-event callback, send read events
# for pushed-back data, and close pending connections. returns 1
# if event loop should continue, or 0 to shut it all down.
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index 7546105e..5250610b 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -364,16 +364,8 @@ sub worker_quit { # $_[0] = signal name or number (unused)
# drop idle connections and try to quit gracefully
@PublicInbox::DS::post_loop_do = (sub {
my ($dmap, undef) = @_;
- my $n = 0;
my $now = now();
- for my $s (values %$dmap) {
- $s->can('busy') or next;
- if ($s->busy) {
- ++$n;
- } else { # close as much as possible, early as possible
- $s->close;
- }
- }
+ my $n = PublicInbox::DS::close_non_busy();
if ($n) {
if (($warn + 5) < now()) {
warn "$$ quitting, $n client(s) left\n";
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 368eee26..977a94c6 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1367,16 +1367,8 @@ sub lazy_start {
$quit->();
}
return 1 if defined($path);
- my $n = 0;
- for my $s (values %$dmap) {
- $s->can('busy') or next;
- if ($s->busy) {
- ++$n;
- } else {
- $s->close;
- }
- }
- drop_all_stores() if !$n; # drop stores only if no clients
+ my $n = PublicInbox::DS::close_non_busy() or
+ drop_all_stores(); # drop stores only if no clients
# returns true: continue, false: stop
$n + scalar(keys(%$PublicInbox::DS::AWAIT_PIDS));
});
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 03/21] ds: don't pass FD map to post_loop_do callback
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
2023-10-04 3:49 ` [PATCH 01/21] lei: drop stores explicitly at daemon shutdown Eric Wong
2023-10-04 3:49 ` [PATCH 02/21] ds: hoist out close_non_busy Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 04/21] move all non-test @post_loop_do into named subs Eric Wong
` (17 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
It's not used by any post_loop_do callbacks anymore, and the
underlying FD map is a global `our' variable accessible from
anywhere, anyways.
---
lib/PublicInbox/DS.pm | 4 +---
lib/PublicInbox/Daemon.pm | 1 -
lib/PublicInbox/IPC.pm | 4 ++--
lib/PublicInbox/LEI.pm | 1 -
4 files changed, 3 insertions(+), 7 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index ecfb581d..d8824a55 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -260,9 +260,7 @@ sub PostEventLoop () {
}
# by default we keep running, unless a postloop callback cancels it
- @post_loop_do ? $post_loop_do[0]->(\%DescriptorMap,
- @post_loop_do[1..$#post_loop_do])
- : 1
+ @post_loop_do ? $post_loop_do[0]->(@post_loop_do[1..$#post_loop_do]) : 1
}
sub sigset_prep ($$$) {
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index 5250610b..e5755981 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -363,7 +363,6 @@ sub worker_quit { # $_[0] = signal name or number (unused)
my $warn = 0;
# drop idle connections and try to quit gracefully
@PublicInbox::DS::post_loop_do = (sub {
- my ($dmap, undef) = @_;
my $now = now();
my $n = PublicInbox::DS::close_non_busy();
if ($n) {
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 9388befd..9b4b1508 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -251,8 +251,8 @@ sub recv_and_run {
$n;
}
-sub sock_defined {
- my (undef, $wqw) = @_;
+sub sock_defined { # PublicInbox::DS::post_loop_do CB
+ my ($wqw) = @_;
defined($wqw->{sock});
}
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 977a94c6..afed84c1 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1355,7 +1355,6 @@ sub lazy_start {
});
$dir_idle->add_watches([$sock_dir]);
local @PublicInbox::DS::post_loop_do = (sub {
- my ($dmap, undef) = @_;
if (@st = defined($path) ? stat($path) : ()) {
if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) {
warn "$path dev/ino changed, quitting\n";
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 04/21] move all non-test @post_loop_do into named subs
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (2 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 03/21] ds: don't pass FD map to post_loop_do callback Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 05/21] lei: close DirIdle (inotify) early at daemon shutdown Eric Wong
` (16 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
Compared to Danga::Socket, our @post_loop_do API is designed to
make it easier to avoid anonymous subs (and their potential for
leaks in buggy old versions of Perl).
---
lib/PublicInbox/Daemon.pm | 38 ++++++++++++++++--------------
lib/PublicInbox/LEI.pm | 49 ++++++++++++++++++++-------------------
lib/PublicInbox/Watch.pm | 4 +++-
3 files changed, 48 insertions(+), 43 deletions(-)
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index e5755981..a4c99cca 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -353,31 +353,33 @@ EOF
bless { pid => $$, pid_file => \$pid_file }, __PACKAGE__;
}
+sub has_busy_clients { # post_loop_do CB
+ my ($state) = @_;
+ my $now = now();
+ my $n = PublicInbox::DS::close_non_busy();
+ if ($n) {
+ if ($state->{-w} < now()) {
+ warn "$$ quitting, $n client(s) left\n";
+ $state->{-w} = now() + 5;
+ }
+ unless (defined $state->{0}) {
+ $state->{0} = (split(/\s+/, $0))[0];
+ $state->{0} =~ s!\A.*?([^/]+)\z!$1!;
+ }
+ $0 = "$state->{0} quitting, $n client(s) left";
+ }
+ $n; # true: loop continues, false: loop breaks
+}
+
sub worker_quit { # $_[0] = signal name or number (unused)
# killing again terminates immediately:
exit unless @listeners;
$_->close foreach @listeners; # call PublicInbox::DS::close
@listeners = ();
- my $proc_name;
- my $warn = 0;
+
# drop idle connections and try to quit gracefully
- @PublicInbox::DS::post_loop_do = (sub {
- my $now = now();
- my $n = PublicInbox::DS::close_non_busy();
- if ($n) {
- if (($warn + 5) < now()) {
- warn "$$ quitting, $n client(s) left\n";
- $warn = now();
- }
- unless (defined $proc_name) {
- $proc_name = (split(/\s+/, $0))[0];
- $proc_name =~ s!\A.*?([^/]+)\z!$1!;
- }
- $0 = "$proc_name quitting, $n client(s) left";
- }
- $n; # true: loop continues, false: loop breaks
- });
+ @PublicInbox::DS::post_loop_do = (\&has_busy_clients, { -w => 0 })
}
sub reopen_logs {
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index afed84c1..74a7f5b9 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1271,13 +1271,30 @@ sub dir_idle_handler ($) { # PublicInbox::DirIdle callback
}
}
-sub drop_all_stores () {
- for my $cfg (values %PATH2CFG) {
- my $sto = delete($cfg->{-lei_store}) // next;
- eval { $sto->wq_io_do('done') };
- warn "E: $@ (dropping store for $cfg->{-f})" if $@;
- $sto->wq_close;
+sub can_stay_alive { # PublicInbox::DS::post_loop_do cb
+ my ($path, $dev_ino_expect) = @_;
+ if (my @st = defined($$path) ? stat($$path) : ()) {
+ if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) {
+ warn "$$path dev/ino changed, quitting\n";
+ $$path = undef;
+ }
+ } elsif (defined($$path)) { # ENOENT is common
+ warn "stat($$path): $!, quitting ...\n" if $! != ENOENT;
+ undef $$path;
+ $quit->();
}
+ return 1 if defined($$path);
+ my $n = PublicInbox::DS::close_non_busy() or do {
+ # drop stores only if no clients
+ for my $cfg (values %PATH2CFG) {
+ my $sto = delete($cfg->{-lei_store}) // next;
+ eval { $sto->wq_io_do('done') };
+ warn "E: $@ (dropping store for $cfg->{-f})" if $@;
+ $sto->wq_close;
+ }
+ };
+ # returns true: continue, false: stop
+ $n + scalar(keys(%$PublicInbox::DS::AWAIT_PIDS));
}
# lei(1) calls this when it can't connect
@@ -1354,24 +1371,8 @@ sub lazy_start {
dir_idle_handler($_[0]) if $_[0]->fullname ne $path;
});
$dir_idle->add_watches([$sock_dir]);
- local @PublicInbox::DS::post_loop_do = (sub {
- if (@st = defined($path) ? stat($path) : ()) {
- if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) {
- warn "$path dev/ino changed, quitting\n";
- $path = undef;
- }
- } elsif (defined($path)) { # ENOENT is common
- warn "stat($path): $!, quitting ...\n" if $! != ENOENT;
- undef $path;
- $quit->();
- }
- return 1 if defined($path);
- my $n = PublicInbox::DS::close_non_busy() or
- drop_all_stores(); # drop stores only if no clients
- # returns true: continue, false: stop
- $n + scalar(keys(%$PublicInbox::DS::AWAIT_PIDS));
- });
-
+ local @PublicInbox::DS::post_loop_do = (\&can_stay_alive,
+ \$path, $dev_ino_expect);
# STDIN was redirected to /dev/null above, closing STDERR and
# STDOUT will cause the calling `lei' client process to finish
# reading the <$daemon> pipe.
diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm
index cf0720e3..3426d4a7 100644
--- a/lib/PublicInbox/Watch.pm
+++ b/lib/PublicInbox/Watch.pm
@@ -533,6 +533,8 @@ sub watch_nntp_init ($$) {
}
}
+sub quit_inprogress { !$_[0]->quit_done } # post_loop_do CB
+
sub watch { # main entry point
my ($self, $sig) = @_;
my $first_sig;
@@ -545,7 +547,7 @@ sub watch { # main entry point
add_timer(0, \&poll_fetch_fork, $self, $intvl, $uris);
}
watch_fs_init($self) if $self->{mdre};
- local @PublicInbox::DS::post_loop_do = (sub { !$self->quit_done });
+ local @PublicInbox::DS::post_loop_do = (\&quit_inprogress, $self);
PublicInbox::DS::event_loop($first_sig); # calls ->event_step
_done_for_now($self);
}
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 05/21] lei: close DirIdle (inotify) early at daemon shutdown
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (3 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 04/21] move all non-test @post_loop_do into named subs Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 06/21] input_pipe: {args} is never undefined Eric Wong
` (15 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
We don't want FS activity to delay lei-daemon shutdown.
---
lib/PublicInbox/DirIdle.pm | 12 +++++++++---
lib/PublicInbox/LEI.pm | 5 +++++
2 files changed, 14 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/DirIdle.pm b/lib/PublicInbox/DirIdle.pm
index af99811c..de6f229b 100644
--- a/lib/PublicInbox/DirIdle.pm
+++ b/lib/PublicInbox/DirIdle.pm
@@ -68,10 +68,16 @@ sub rm_watches {
}
}
+sub close {
+ my ($self) = @_;
+ delete $self->{cb};
+ $self->SUPER::close; # if using real kevent/inotify
+}
+
sub event_step {
my ($self) = @_;
- my $cb = $self->{cb};
- local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously
+ my $cb = $self->{cb} or return;
+ local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously (FIXME)
eval {
my @events = $self->{inot}->read; # Linux::Inotify2->read
$cb->($_) for @events;
@@ -83,7 +89,7 @@ sub force_close {
my ($self) = @_;
my $inot = delete $self->{inot} // return;
if ($inot->can('fh')) { # Linux::Inotify2 2.3+
- close($inot->fh) or warn "CLOSE ERROR: $!";
+ CORE::close($inot->fh) or warn "CLOSE ERROR: $!";
} elsif ($inot->isa('Linux::Inotify2')) {
require PublicInbox::LI2Wrap;
PublicInbox::LI2Wrap::wrapclose($inot);
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 74a7f5b9..8362800d 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1285,8 +1285,11 @@ sub can_stay_alive { # PublicInbox::DS::post_loop_do cb
}
return 1 if defined($$path);
my $n = PublicInbox::DS::close_non_busy() or do {
+ eval 'PublicInbox::LeiNoteEvent::flush_task()';
# drop stores only if no clients
for my $cfg (values %PATH2CFG) {
+ my $lne = delete($cfg->{-lei_note_event});
+ $lne->wq_close if $lne;
my $sto = delete($cfg->{-lei_store}) // next;
eval { $sto->wq_io_do('done') };
warn "E: $@ (dropping store for $cfg->{-f})" if $@;
@@ -1346,6 +1349,8 @@ sub lazy_start {
my (undef, $eof_p) = PublicInbox::PktOp->pair;
sub {
$exit_code //= eval("POSIX::SIG$_[0] + 128") if @_;
+ $dir_idle->close if $dir_idle; # EPOLL_CTL_DEL
+ $dir_idle = undef; # let RC take care of it
eval 'PublicInbox::LeiNoteEvent::flush_task()';
my $lis = $pil or exit($exit_code // 0);
# closing eof_p triggers \&noop wakeup
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 06/21] input_pipe: {args} is never undefined
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (4 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 05/21] lei: close DirIdle (inotify) early at daemon shutdown Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 07/21] lei: do_env combines fchdir and local Eric Wong
` (14 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
So save us a few ugly defined-ness checks.
---
lib/PublicInbox/InputPipe.pm | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
diff --git a/lib/PublicInbox/InputPipe.pm b/lib/PublicInbox/InputPipe.pm
index e1e26e20..60a9f01f 100644
--- a/lib/PublicInbox/InputPipe.pm
+++ b/lib/PublicInbox/InputPipe.pm
@@ -1,10 +1,9 @@
-# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# for reading pipes and sockets off the DS event loop
package PublicInbox::InputPipe;
-use strict;
-use v5.10.1;
+use v5.12;
use parent qw(PublicInbox::DS);
use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
@@ -20,15 +19,15 @@ sub event_step {
my ($self) = @_;
my $r = sysread($self->{sock} // return, my $rbuf, 65536);
if ($r) {
- $self->{cb}->(@{$self->{args} // []}, $rbuf);
+ $self->{cb}->(@{$self->{args}}, $rbuf);
return $self->requeue; # may be regular file or pipe
}
if (defined($r)) { # EOF
- $self->{cb}->(@{$self->{args} // []}, '');
+ $self->{cb}->(@{$self->{args}}, '');
} elsif ($!{EAGAIN}) {
return;
} else { # another error
- $self->{cb}->(@{$self->{args} // []}, undef)
+ $self->{cb}->(@{$self->{args}}, undef)
}
$self->{sock}->blocking ? delete($self->{sock}) : $self->close
}
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 07/21] lei: do_env combines fchdir and local
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (5 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 06/21] input_pipe: {args} is never undefined Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 08/21] lei: get rid of l2m_progress PktOp callback Eric Wong
` (13 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
This will make switching $lei contexts less error-prone
and hopefully save us from some suprising bugs in the future.
Followup-to: 759885e60e59 (lei: ensure --stdin sets %ENV and $current_lei, 2023-09-14)
---
lib/PublicInbox/LEI.pm | 16 ++++--
lib/PublicInbox/LeiAuth.pm | 4 +-
lib/PublicInbox/LeiConfig.pm | 25 ++++----
lib/PublicInbox/LeiInspect.pm | 28 ++++-----
lib/PublicInbox/LeiLcat.pm | 17 +++---
lib/PublicInbox/LeiQuery.pm | 19 +++----
lib/PublicInbox/LeiXSearch.pm | 104 ++++++++++++++++------------------
lib/PublicInbox/PktOp.pm | 15 +++--
8 files changed, 112 insertions(+), 116 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 8362800d..3408551b 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -479,7 +479,6 @@ sub _drop_wq {
# pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
sub x_it ($$) {
my ($self, $code) = @_;
- local $current_lei = $self;
# make sure client sees stdout before exit
$self->{1}->autoflush(1) if $self->{1};
stop_pager($self);
@@ -514,7 +513,6 @@ sub qfin { # show message on finalization (LeiFinmsg)
sub fail_handler ($;$$) {
my ($lei, $code, $io) = @_;
- local $current_lei = $lei;
close($io) if $io; # needed to avoid warnings on SIGPIPE
_drop_wq($lei);
x_it($lei, $code // (1 << 8));
@@ -785,11 +783,19 @@ sub lazy_cb ($$$) { # $pfx is _complete_ or lei_
$pkg->can($pfx.$ucmd) : undef;
}
+sub do_env {
+ my $lei = shift;
+ fchdir($lei);
+ my $cb = shift // return ($lei, %{$lei->{env}}) ;
+ local ($current_lei, %ENV) = ($lei, %{$lei->{env}});
+ $cb = $lei->can($cb) if !ref($cb); # $cb may be a scalar sub name
+ eval { $cb->($lei, @_) };
+ $lei->fail($@) if $@;
+}
+
sub dispatch {
my ($self, $cmd, @argv) = @_;
- fchdir($self);
- local %ENV = %{$self->{env}};
- local $current_lei = $self; # for __WARN__
+ local ($current_lei, %ENV) = do_env($self);
$self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY
return _help($self, 'no command given') unless defined($cmd);
# do not support Getopt bundling for this
diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
index 9b09cecf..76a4410d 100644
--- a/lib/PublicInbox/LeiAuth.pm
+++ b/lib/PublicInbox/LeiAuth.pm
@@ -57,7 +57,7 @@ sub net_merge_all { # called in wq worker via wq_broadcast
# called by top-level lei-daemon when first worker is done with auth
# passes updated net auth info to current workers
sub net_merge_continue {
- my ($wq, $lei, $net_new) = @_;
+ my ($lei, $wq, $net_new) = @_;
$wq->{-net_new} = $net_new; # for "lei up"
$wq->wq_broadcast('PublicInbox::LeiAuth::net_merge_all', $net_new);
$wq->net_merge_all_done($lei); # defined per-WQ
@@ -65,7 +65,7 @@ sub net_merge_continue {
sub op_merge { # prepares PktOp->pair ops
my ($self, $ops, $wq, $lei) = @_;
- $ops->{net_merge_continue} = [ \&net_merge_continue, $wq, $lei ];
+ $ops->{net_merge_continue} = [ \&net_merge_continue, $lei, $wq ];
}
sub new { bless \(my $x), __PACKAGE__ }
diff --git a/lib/PublicInbox/LeiConfig.pm b/lib/PublicInbox/LeiConfig.pm
index 76fc43e7..b3495487 100644
--- a/lib/PublicInbox/LeiConfig.pm
+++ b/lib/PublicInbox/LeiConfig.pm
@@ -16,24 +16,21 @@ sub cfg_do_edit ($;$) {
# run in script/lei foreground
my ($op_c, $op_p) = PublicInbox::PktOp->pair;
# $op_p will EOF when $EDITOR is done
- $op_c->{ops} = { '' => [\&cfg_edit_done, $self] };
+ $op_c->{ops} = { '' => [\&cfg_edit_done, $lei, $self] };
$lei->send_exec_cmd([ @$lei{qw(0 1 2)}, $op_p->{op_p} ], $cmd, $env);
}
-sub cfg_edit_done { # PktOp
- my ($self) = @_;
- eval {
- open my $fh, '+>', undef or die "open($!)";
- my $cfg = do {
- local $self->{lei}->{2} = $fh;
- $self->{lei}->cfg_dump($self->{-f});
- } or do {
- seek($fh, 0, SEEK_SET);
- return cfg_do_edit($self, do { local $/; <$fh> });
- };
- $self->cfg_verify($cfg) if $self->can('cfg_verify');
+sub cfg_edit_done { # PktOp lei->do_env cb
+ my ($lei, $self) = @_;
+ open my $fh, '+>', undef or die "open($!)";
+ my $cfg = do {
+ local $lei->{2} = $fh;
+ $lei->cfg_dump($self->{-f});
+ } or do {
+ seek($fh, 0, SEEK_SET);
+ return cfg_do_edit($self, do { local $/; <$fh> });
};
- $self->{lei}->fail($@) if $@;
+ $self->cfg_verify($cfg) if $self->can('cfg_verify');
}
sub lei_config {
diff --git a/lib/PublicInbox/LeiInspect.pm b/lib/PublicInbox/LeiInspect.pm
index 0455e739..f801610f 100644
--- a/lib/PublicInbox/LeiInspect.pm
+++ b/lib/PublicInbox/LeiInspect.pm
@@ -251,24 +251,20 @@ sub inspect_start ($$) {
$self->wq_close;
}
+sub do_inspect { # lei->do_env cb
+ my ($lei) = @_;
+ my $str = delete $lei->{istr};
+ $str =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+ my $eml = PublicInbox::Eml->new(\$str);
+ inspect_start($lei, [ 'blob:'.$lei->git_oid($eml)->hexdigest,
+ map { "mid:$_" } @{mids($eml)} ]);
+}
+
sub ins_add { # InputPipe->consume callback
my ($lei) = @_; # $_[1] = $rbuf
- if (defined $_[1]) {
- $_[1] eq '' and return eval {
- $lei->fchdir;
- local %ENV = %{$lei->{env}};
- local $PublicInbox::LEI::current_lei = $lei;
- my $str = delete $lei->{istr};
- $str =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
- my $eml = PublicInbox::Eml->new(\$str);
- inspect_start($lei, [
- 'blob:'.$lei->git_oid($eml)->hexdigest,
- map { "mid:$_" } @{mids($eml)} ]);
- };
- $lei->{istr} .= $_[1];
- } else {
- $lei->fail("error reading stdin: $!");
- }
+ $_[1] // return $lei->fail("error reading stdin: $!");
+ return $lei->{istr} .= $_[1] if $_[1] ne '';
+ $lei->do_env(\&do_inspect);
}
sub lei_inspect {
diff --git a/lib/PublicInbox/LeiLcat.pm b/lib/PublicInbox/LeiLcat.pm
index 7ed191c3..72875dc6 100644
--- a/lib/PublicInbox/LeiLcat.pm
+++ b/lib/PublicInbox/LeiLcat.pm
@@ -122,19 +122,18 @@ could not extract Message-ID from $x
@q ? join(' OR ', @q) : $lei->fail("no Message-ID in: @argv");
}
+sub do_lcat { # lei->do_env cb
+ my ($lei) = @_;
+ my @argv = split(/\s+/, $lei->{mset_opt}->{qstr});
+ $lei->{mset_opt}->{qstr} = extract_all($lei, @argv) or return;
+ $lei->_start_query;
+}
+
sub _stdin { # PublicInbox::InputPipe::consume callback for --stdin
my ($lei) = @_; # $_[1] = $rbuf
$_[1] // return $lei->fail("error reading stdin: $!");
return $lei->{mset_opt}->{qstr} .= $_[1] if $_[1] ne '';
- eval {
- $lei->fchdir;
- local %ENV = %{$lei->{env}};
- local $PublicInbox::LEI::current_lei = $lei;
- my @argv = split(/\s+/, $lei->{mset_opt}->{qstr});
- $lei->{mset_opt}->{qstr} = extract_all($lei, @argv) or return;
- $lei->_start_query;
- };
- $lei->fail($@) if $@;
+ $lei->do_env(\&do_lcat);
}
sub lei_lcat {
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index a23354f0..e2d8a096 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -59,20 +59,19 @@ sub _start_query { # used by "lei q" and "lei up"
$lxs->do_query($self);
}
+sub do_qry { # do_env cb
+ my ($lei) = @_;
+ $lei->{mset_opt}->{q_raw} = $lei->{mset_opt}->{qstr};
+ $lei->{lse}->query_approxidate($lei->{lse}->git,
+ $lei->{mset_opt}->{qstr});
+ _start_query($lei);
+}
+
sub qstr_add { # PublicInbox::InputPipe::consume callback for --stdin
my ($lei) = @_; # $_[1] = $rbuf
$_[1] // $lei->fail("error reading stdin: $!");
return $lei->{mset_opt}->{qstr} .= $_[1] if $_[1] ne '';
- eval {
- $lei->fchdir;
- local %ENV = %{$lei->{env}};
- local $PublicInbox::LEI::current_lei = $lei;
- $lei->{mset_opt}->{q_raw} = $lei->{mset_opt}->{qstr};
- $lei->{lse}->query_approxidate($lei->{lse}->git,
- $lei->{mset_opt}->{qstr});
- _start_query($lei);
- };
- $lei->fail($@) if $@;
+ $lei->do_env(\&do_qry);
}
# make the URI||PublicInbox::{Inbox,ExtSearch} a config-file friendly string
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 4e0849e8..8f63149e 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -405,62 +405,54 @@ sub xsearch_done_wait { # awaitpid cb
sub query_done { # EOF callback for main daemon
my ($lei) = @_;
- local $PublicInbox::LEI::current_lei = $lei;
- eval {
- my $l2m = delete $lei->{l2m};
- delete $lei->{lxs};
- ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
- warn "BUG: {sto} missing with --mail-sync";
- $lei->sto_done_request if $lei->{sto};
- if (my $v2w = delete $lei->{v2w}) {
- my $wait = $v2w->wq_do('done'); # may die
- $v2w->wq_close;
- }
- $lei->{ovv}->ovv_end($lei);
- if ($l2m) { # close() calls LeiToMail reap_compress
- if (my $out = delete $lei->{old_1}) {
- if (my $mbout = $lei->{1}) {
- close($mbout) or die <<"";
+ my $l2m = delete $lei->{l2m};
+ delete $lei->{lxs};
+ ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
+ warn "BUG: {sto} missing with --mail-sync";
+ $lei->sto_done_request if $lei->{sto};
+ if (my $v2w = delete $lei->{v2w}) {
+ my $wait = $v2w->wq_do('done'); # may die
+ $v2w->wq_close;
+ }
+ $lei->{ovv}->ovv_end($lei);
+ if ($l2m) { # close() calls LeiToMail reap_compress
+ if (my $out = delete $lei->{old_1}) {
+ if (my $mbout = $lei->{1}) {
+ close($mbout) or die <<"";
Error closing $lei->{ovv}->{dst}: \$!=$! \$?=$?
- }
- $lei->{1} = $out;
- }
- if ($l2m->lock_free) {
- $l2m->poke_dst;
- $lei->poke_mua;
- } else { # mbox users
- delete $l2m->{mbl}; # drop dotlock
}
+ $lei->{1} = $out;
}
- if ($lei->{-progress}) {
- my $tot = $lei->{-mset_total} // 0;
- my $nr_w = $lei->{-nr_write} // 0;
- my $d = ($lei->{-nr_seen} // 0) - $nr_w;
- my $x = "$tot matches";
- $x .= ", $d duplicates" if $d;
- if ($l2m) {
- my $m = "# $nr_w written to " .
- "$lei->{ovv}->{dst} ($x)";
- $nr_w ? $lei->qfin($m) : $lei->qerr($m);
- } else {
- $lei->qerr("# $x");
- }
+ if ($l2m->lock_free) {
+ $l2m->poke_dst;
+ $lei->poke_mua;
+ } else { # mbox users
+ delete $l2m->{mbl}; # drop dotlock
}
- $lei->start_mua if $l2m && !$l2m->lock_free;
- $lei->dclose;
- };
- $lei->fail($@) if $@;
+ }
+ if ($lei->{-progress}) {
+ my $tot = $lei->{-mset_total} // 0;
+ my $nr_w = $lei->{-nr_write} // 0;
+ my $d = ($lei->{-nr_seen} // 0) - $nr_w;
+ my $x = "$tot matches";
+ $x .= ", $d duplicates" if $d;
+ if ($l2m) {
+ my $m = "# $nr_w written to " .
+ "$lei->{ovv}->{dst} ($x)";
+ $nr_w ? $lei->qfin($m) : $lei->qerr($m);
+ } else {
+ $lei->qerr("# $x");
+ }
+ }
+ $lei->start_mua if $l2m && !$l2m->lock_free;
+ $lei->dclose;
}
sub do_post_augment {
my ($lei) = @_;
- local $PublicInbox::LEI::current_lei = $lei;
my $l2m = $lei->{l2m} or return; # client disconnected
- eval {
- $lei->fchdir;
- $l2m->post_augment($lei);
- };
+ eval { $l2m->post_augment($lei) };
my $err = $@;
if ($err) {
if (my $lxs = delete $lei->{lxs}) {
@@ -518,7 +510,7 @@ sub start_query ($$) { # always runs in main (lei-daemon) process
}
sub incr_start_query { # called whenever an l2m shard starts do_post_auth
- my ($self, $lei) = @_;
+ my ($lei, $self) = @_;
my $l2m = $lei->{l2m};
return if ++$self->{nr_start_query} != $l2m->{-wq_nr_workers};
start_query($self, $lei);
@@ -534,16 +526,16 @@ sub do_query {
my ($self, $lei) = @_;
my $l2m = $lei->{l2m};
my $ops = {
- 'sigpipe_handler' => [ $lei ],
- 'fail_handler' => [ $lei ],
- 'do_post_augment' => [ \&do_post_augment, $lei ],
- 'incr_post_augment' => [ \&incr_post_augment, $lei ],
+ sigpipe_handler => [ $lei ],
+ fail_handler => [ $lei ],
+ do_post_augment => [ \&do_post_augment, $lei ],
+ incr_post_augment => [ \&incr_post_augment, $lei ],
'' => [ \&query_done, $lei ],
- 'mset_progress' => [ \&mset_progress, $lei ],
- 'l2m_progress' => [ \&l2m_progress, $lei ],
- 'x_it' => [ $lei ],
- 'child_error' => [ $lei ],
- 'incr_start_query' => [ $self, $lei ],
+ mset_progress => [ \&mset_progress, $lei ],
+ l2m_progress => [ \&l2m_progress, $lei ],
+ x_it => [ $lei ],
+ child_error => [ $lei ],
+ incr_start_query => [ \&incr_start_query, $lei, $self ],
};
$lei->{auth}->op_merge($ops, $l2m, $lei) if $l2m && $lei->{auth};
my $end = $lei->pkt_op_pair;
diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm
index dc432307..1bcdd799 100644
--- a/lib/PublicInbox/PktOp.pm
+++ b/lib/PublicInbox/PktOp.pm
@@ -1,4 +1,4 @@
-# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# op dispatch socket, reads a message, runs a sub
@@ -6,8 +6,7 @@
# Used for lei_xsearch and maybe other things
# "command" => [ $sub, @fixed_operands ]
package PublicInbox::PktOp;
-use strict;
-use v5.10.1;
+use v5.12;
use parent qw(PublicInbox::DS);
use Errno qw(EAGAIN ECONNRESET);
use PublicInbox::Syscall qw(EPOLLIN);
@@ -55,7 +54,15 @@ sub event_step {
my $op = $self->{ops}->{$cmd //= $msg};
if ($op) {
my ($obj, @args) = (@$op, @pargs);
- blessed($obj) ? $obj->$cmd(@args) : $obj->(@args);
+ if (blessed($args[0]) && $args[0]->can('do_env')) {
+ my $lei = shift @args;
+ $lei->do_env($obj, @args);
+ } elsif (blessed($obj)) {
+ $obj->can('do_env') ? $obj->do_env($cmd, @args)
+ : $obj->$cmd(@args);
+ } else {
+ $obj->(@args);
+ }
} elsif ($msg ne '') {
die "BUG: unknown message: `$cmd'";
}
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 08/21] lei: get rid of l2m_progress PktOp callback
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (6 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 07/21] lei: do_env combines fchdir and local Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 09/21] t/lei_to_mail: modernize and document test Eric Wong
` (12 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
We already have an ->incr callback we can enhance to support
multiple counters with a single request. Furthermore, we can
just flatten the object graph by storing counters directly in
the $lei object itself to reduce hash lookups.
---
lib/PublicInbox/LEI.pm | 13 ++++++-------
lib/PublicInbox/LeiConvert.pm | 7 ++++---
lib/PublicInbox/LeiTag.pm | 6 +++---
lib/PublicInbox/LeiToMail.pm | 20 ++++++++++----------
lib/PublicInbox/LeiXSearch.pm | 15 ++++-----------
t/lei-tag.t | 3 +++
6 files changed, 30 insertions(+), 34 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 3408551b..fba4edf3 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -628,8 +628,8 @@ sub pkt_op_pair {
}
sub incr {
- my ($self, $field, $nr) = @_;
- $self->{counters}->{$field} += $nr;
+ my $lei = shift;
+ while (my ($f, $n) = splice(@_, 0, 2)) { $lei->{$f} += $n }
}
sub pkt_ops {
@@ -1418,11 +1418,10 @@ sub busy { 1 } # prevent daemon-shutdown if client is connected
# can immediately reread it
sub DESTROY {
my ($self) = @_;
- if (my $counters = delete $self->{counters}) {
- for my $k (sort keys %$counters) {
- my $nr = $counters->{$k};
- $self->child_error(0, "$nr $k messages");
- }
+ for my $k (sort(grep(/\A-nr_/, keys %$self))) {
+ my $nr = $self->{$k};
+ substr($k, 0, length('-nr_'), '');
+ $self->child_error(0, "$nr $k messages");
}
$self->{1}->autoflush(1) if $self->{1};
stop_pager($self);
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 1acd4558..22aba81a 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -34,9 +34,10 @@ sub process_inputs { # via wq_do
$self->SUPER::process_inputs;
my $lei = $self->{lei};
delete $lei->{1};
+ my $l2m = delete $self->{l2m};
delete $self->{wcb}; # commit
- my $nr_w = delete($lei->{-nr_write}) // 0;
- my $d = (delete($lei->{-nr_seen}) // 0) - $nr_w;
+ my $nr_w = delete($l2m->{-nr_write}) // 0;
+ my $d = (delete($l2m->{-nr_seen}) // 0) - $nr_w;
$d = $d ? " ($d duplicates)" : '';
$lei->qerr("# converted $nr_w messages$d");
}
@@ -64,7 +65,7 @@ sub ipc_atfork_child {
my ($self) = @_;
my $lei = $self->{lei};
$lei->_lei_atfork_child;
- my $l2m = delete $lei->{l2m};
+ my $l2m = $lei->{l2m};
if (my $net = $lei->{net}) { # may prompt user once
$net->{mics_cached} = $net->imap_common_init($lei);
$net->{nn_cached} = $net->nntp_common_init($lei);
diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm
index 76bd2d70..320b0355 100644
--- a/lib/PublicInbox/LeiTag.pm
+++ b/lib/PublicInbox/LeiTag.pm
@@ -15,7 +15,7 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh
$self->{lei}->{sto}->wq_do('update_xvmd', $xoids, $eml,
$self->{lei}->{vmd_mod});
} else {
- ++$self->{unimported};
+ ++$self->{-nr_unimported};
}
}
@@ -40,8 +40,8 @@ sub lei_tag { # the "lei tag" method
sub note_unimported {
my ($self) = @_;
- my $n = $self->{unimported} or return;
- $self->{lei}->{pkt_op_p}->pkt_do('incr', 'unimported', $n);
+ my $n = $self->{-nr_unimported} or return;
+ $self->{lei}->{pkt_op_p}->pkt_do('incr', -nr_unimported => $n);
}
sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index b9f28ee4..f239da82 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -195,7 +195,7 @@ sub _mbox_write_cb ($$) {
sub { # for git_to_mail
my ($buf, $smsg, $eml) = @_;
$eml //= PublicInbox::Eml->new($buf);
- ++$lei->{-nr_seen};
+ ++$self->{-nr_seen};
return if $dedupe->is_dup($eml, $smsg);
$lse->xsmsg_vmd($smsg) if $lse;
$smsg->{-recent} = 1 if $set_recent;
@@ -206,7 +206,7 @@ sub _mbox_write_cb ($$) {
my $lk = $ovv->lock_for_scope;
$lei->out($$buf);
}
- ++$lei->{-nr_write};
+ ++$self->{-nr_write};
}
}
@@ -291,7 +291,7 @@ sub _maildir_write_cb ($$) {
my ($bref, $smsg, $eml) = @_;
$dst // return $lei->fail; # dst may be undef-ed in last run
- ++$lei->{-nr_seen};
+ ++$self->{-nr_seen};
return if $dedupe && $dedupe->is_dup($eml //
PublicInbox::Eml->new($$bref),
$smsg);
@@ -299,7 +299,7 @@ sub _maildir_write_cb ($$) {
my $n = _buf2maildir($dst, $bref // \($eml->as_string),
$smsg, $dir);
$lms->set_src($smsg->oidbin, $out, $n) if $lms;
- ++$lei->{-nr_write};
+ ++$self->{-nr_write};
}
}
@@ -322,7 +322,7 @@ EOM
my ($bref, $smsg, $eml) = @_;
$mic // return $lei->fail; # mic may be undef-ed in last run
- ++$lei->{-nr_seen};
+ ++$self->{-nr_seen};
return if $dedupe && $dedupe->is_dup($eml //
PublicInbox::Eml->new($$bref),
$smsg);
@@ -335,7 +335,7 @@ EOM
# imap_append returns UID if IMAP server has UIDPLUS extension
($lms && $uid =~ /\A[0-9]+\z/) and
$lms->set_src($smsg->oidbin, $$uri, $uid + 0);
- ++$lei->{-nr_write};
+ ++$self->{-nr_write};
}
}
@@ -366,10 +366,10 @@ sub _v2_write_cb ($$) {
sub { # for git_to_mail
my ($bref, $smsg, $eml) = @_;
$eml //= PublicInbox::Eml->new($bref);
- ++$lei->{-nr_seen};
+ ++$self->{-nr_seen};
return if $dedupe && $dedupe->is_dup($eml, $smsg);
$lei->{v2w}->wq_do('add', $eml); # V2Writable->add
- ++$lei->{-nr_write};
+ ++$self->{-nr_write};
}
}
@@ -796,11 +796,11 @@ sub wq_atexit_child {
local $PublicInbox::DS::in_loop = 0; # waitpid synchronously
my $lei = $self->{lei};
$lei->{ale}->git->async_wait_all;
- my ($nr_w, $nr_s) = delete(@$lei{qw(-nr_write -nr_seen)});
+ my ($nr_w, $nr_s) = delete(@$self{qw(-nr_write -nr_seen)});
delete $self->{wcb};
(($nr_w //= 0) + ($nr_s //= 0)) or return;
return if $lei->{early_mua} || !$lei->{-progress} || !$lei->{pkt_op_p};
- $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr_w, $nr_s);
+ $lei->{pkt_op_p}->pkt_do('incr', -nr_write => $nr_w, -nr_seen => $nr_s)
}
# runs on a 1s timer in lei-daemon
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 8f63149e..1caa9d06 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -155,12 +155,6 @@ sub mset_progress {
}
}
-sub l2m_progress {
- my ($lei, $nr_write, $nr_seen) = @_;
- $lei->{-nr_write} += $nr_write;
- $lei->{-nr_seen} += $nr_seen;
-}
-
sub query_one_mset { # for --threads and l2m w/o sort
my ($self, $ibxish) = @_;
local $0 = "$0 query_one_mset";
@@ -354,7 +348,6 @@ sub query_remote_mboxrd {
$self->{import_sto} = $lei->{sto} if $lei->{opt}->{'import-remote'};
for my $uri (@$uris) {
$lei->{-current_url} = $uri->as_string;
- $lei->{-nr_remote_eml} = 0;
my $start = time;
my ($q, $key) = fudge_qstr_time($lei, $uri, $qstr);
$uri->query_form(@qform, q => $q);
@@ -369,9 +362,9 @@ sub query_remote_mboxrd {
my $wait = $self->{import_sto}->wq_do('done');
}
$reap_curl->join;
+ my $nr = delete $lei->{-nr_remote_eml} // 0;
if ($? == 0) {
# don't update if no results, maybe MTA is down
- my $nr = $lei->{-nr_remote_eml};
$lei->{lss}->cfg_set($key, $start) if $key && $nr;
mset_progress($lei, $lei->{-current_url}, $nr, $nr);
next;
@@ -433,8 +426,8 @@ Error closing $lei->{ovv}->{dst}: \$!=$! \$?=$?
}
if ($lei->{-progress}) {
my $tot = $lei->{-mset_total} // 0;
- my $nr_w = $lei->{-nr_write} // 0;
- my $d = ($lei->{-nr_seen} // 0) - $nr_w;
+ my $nr_w = delete($lei->{-nr_write}) // 0;
+ my $d = (delete($lei->{-nr_seen}) // 0) - $nr_w;
my $x = "$tot matches";
$x .= ", $d duplicates" if $d;
if ($l2m) {
@@ -532,7 +525,7 @@ sub do_query {
incr_post_augment => [ \&incr_post_augment, $lei ],
'' => [ \&query_done, $lei ],
mset_progress => [ \&mset_progress, $lei ],
- l2m_progress => [ \&l2m_progress, $lei ],
+ incr => [ $lei ],
x_it => [ $lei ],
child_error => [ $lei ],
incr_start_query => [ \&incr_start_query, $lei, $self ],
diff --git a/t/lei-tag.t b/t/lei-tag.t
index 822677a7..cccf0af6 100644
--- a/t/lei-tag.t
+++ b/t/lei-tag.t
@@ -101,5 +101,8 @@ test_lei(sub {
if (0) { # TODO label+kw search w/ externals
lei_ok(qw(q L:qp), "mid:$mid", '--only', "$ro_home/t2");
}
+ lei_ok qw(tag +L:nope -F eml t/data/binary.patch);
+ like $lei_err, qr/\b1 unimported messages/, 'noted unimported'
+ or diag $lei_err;
});
done_testing;
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 09/21] t/lei_to_mail: modernize and document test
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (7 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 08/21] lei: get rid of l2m_progress PktOp callback Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 10/21] lei: reuse PublicInbox::Config::noop Eric Wong
` (11 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
---
t/lei_to_mail.t | 37 +++++++++++++++++++------------------
1 file changed, 19 insertions(+), 18 deletions(-)
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index d692751c..dbd33909 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -1,9 +1,10 @@
#!perl -w
# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-use strict;
-use v5.10.1;
-use Test::More;
+# tests PublicInbox::LeiToMail internals (unstable API)
+# Not as needed now that lei functionality has been ironed out
+use v5.12;
+use autodie qw(open sysopen unlink);
use PublicInbox::TestCommon;
use PublicInbox::Eml;
use Fcntl qw(SEEK_SET O_RDONLY O_NONBLOCK);
@@ -74,7 +75,7 @@ for my $mbox (@MBOX) {
my ($tmpdir, $for_destroy) = tmpdir();
local $ENV{TMPDIR} = $tmpdir;
-open my $err, '>>', "$tmpdir/lei.err" or BAIL_OUT $!;
+open my $err, '>>', "$tmpdir/lei.err";
my $lei = bless { 2 => $err, cmd => 'test' }, 'PublicInbox::LEI';
my $commit = sub {
$_[0] = undef; # wcb
@@ -114,16 +115,16 @@ my $orig = do {
ok(-f $fn && !-s _, 'empty file created');
$wcb->(\(my $dup = $buf), $deadbeef);
$commit->($wcb);
- open my $fh, '<', $fn or BAIL_OUT $!;
+ open my $fh, '<', $fn;
my $raw = do { local $/; <$fh> };
like($raw, qr/^blah\n/sm, 'wrote content');
- unlink $fn or BAIL_OUT $!;
+ unlink $fn;
$wcb = $wcb_get->($mbox, $fn);
ok(-f $fn && !-s _, 'truncated mbox destination');
$wcb->(\($dup = $buf), $deadbeef);
$commit->($wcb);
- open $fh, '<', $fn or BAIL_OUT $!;
+ open $fh, '<', $fn;
is(do { local $/; <$fh> }, $raw, 'wrote identical content');
$raw;
};
@@ -162,7 +163,7 @@ for my $zsfx (qw(gz bz2 xz)) {
my $uncompressed = xqx([@$dc_cmd, $f]);
is($uncompressed, $orig, "$zsfx works unlocked");
- unlink $f or BAIL_OUT "unlink $!";
+ unlink $f;
$wcb = $wcb_get->($mbox, $f);
$wcb->(\($dup = $buf), { %$deadbeef });
$commit->($wcb);
@@ -201,14 +202,14 @@ my $as_orig = sub {
$eml->as_string;
};
-unlink $fn or BAIL_OUT $!;
+unlink $fn;
if ('default deduplication uses content_hash') {
my $wcb = $wcb_get->('mboxo', $fn);
$deadbeef->{kw} = [];
$wcb->(\(my $x = $buf), $deadbeef) for (1..2);
$commit->($wcb);
my $cmp = '';
- open my $fh, '<', $fn or BAIL_OUT $!;
+ open my $fh, '<', $fn;
PublicInbox::MboxReader->mboxo($fh, sub { $cmp .= $as_orig->(@_) });
is($cmp, $buf, 'only one message written');
@@ -216,7 +217,7 @@ if ('default deduplication uses content_hash') {
$wcb = $wcb_get->('mboxo', $fn);
$wcb->(\($x = $buf . "\nx\n"), $deadbeef) for (1..2);
$commit->($wcb);
- open $fh, '<', $fn or BAIL_OUT $!;
+ open $fh, '<', $fn;
my @x;
PublicInbox::MboxReader->mboxo($fh, sub { push @x, $as_orig->(@_) });
is(scalar(@x), 2, 'augmented mboxo');
@@ -225,12 +226,12 @@ if ('default deduplication uses content_hash') {
}
{ # stdout support
- open my $tmp, '+>', undef or BAIL_OUT $!;
+ open my $tmp, '+>', undef;
local $lei->{1} = $tmp;
my $wcb = $wcb_get->('mboxrd', '/dev/stdout');
$wcb->(\(my $x = $buf), $deadbeef);
$commit->($wcb);
- seek($tmp, 0, SEEK_SET) or BAIL_OUT $!;
+ seek($tmp, 0, SEEK_SET);
my $cmp = '';
PublicInbox::MboxReader->mboxrd($tmp, sub { $cmp .= $as_orig->(@_) });
is($cmp, $buf, 'message written to stdout');
@@ -240,7 +241,7 @@ SKIP: { # FIFO support
use POSIX qw(mkfifo);
my $fn = "$tmpdir/fifo";
mkfifo($fn, 0600) or skip("mkfifo not supported: $!", 1);
- sysopen(my $cat, $fn, O_RDONLY|O_NONBLOCK) or BAIL_OUT $!;
+ sysopen(my $cat, $fn, O_RDONLY|O_NONBLOCK);
my $wcb = $wcb_get->('mboxo', $fn);
$wcb->(\(my $x = $buf), $deadbeef);
$commit->($wcb);
@@ -260,7 +261,7 @@ SKIP: { # FIFO support
my @f;
$mdr->maildir_each_file($md, sub { push @f, shift });
- open my $fh, '<', $f[0] or BAIL_OUT $!;
+ open my $fh, '<', $f[0];
is(do { local $/; <$fh> }, $buf, 'wrote to Maildir');
$wcb = $wcb_get->('maildir', $md);
@@ -271,7 +272,7 @@ SKIP: { # FIFO support
$mdr->maildir_each_file($md, sub { push @x, shift });
is(scalar(@x), 1, 'wrote one new file');
ok(!-f $f[0], 'old file clobbered');
- open $fh, '<', $x[0] or BAIL_OUT $!;
+ open $fh, '<', $x[0];
is(do { local $/; <$fh> }, $buf."\nx\n", 'wrote new file to Maildir');
local $lei->{opt}->{augment} = 1;
@@ -283,9 +284,9 @@ SKIP: { # FIFO support
is(scalar grep(/\A\Q$x[0]\E\z/, @f), 1, 'old file still there');
my @new = grep(!/\A\Q$x[0]\E\z/, @f);
is(scalar @new, 1, '1 new file written (b4dc0ffee skipped)');
- open $fh, '<', $x[0] or BAIL_OUT $!;
+ open $fh, '<', $x[0];
is(do { local $/; <$fh> }, $buf."\nx\n", 'old file untouched');
- open $fh, '<', $new[0] or BAIL_OUT $!;
+ open $fh, '<', $new[0];
is(do { local $/; <$fh> }, $buf."\ny\n", 'new file written');
}
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 10/21] lei: reuse PublicInbox::Config::noop
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (8 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 09/21] t/lei_to_mail: modernize and document test Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 11/21] lei: keep signals blocked on daemon shutdown Eric Wong
` (10 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
No need to define our own empty `noop' sub when PublicInbox::Config
already has one and is loaded anyways.
---
lib/PublicInbox/LEI.pm | 14 +++-----------
1 file changed, 3 insertions(+), 11 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index fba4edf3..c9ad46e2 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1217,8 +1217,6 @@ sub event_step_init {
};
}
-sub noop {}
-
sub oldset { $oldset }
sub dump_and_clear_log {
@@ -1364,15 +1362,9 @@ sub lazy_start {
$lis->close; # DS::close
};
};
- my $sig = {
- CHLD => \&PublicInbox::DS::enqueue_reap,
- QUIT => $quit,
- INT => $quit,
- TERM => $quit,
- HUP => \&noop,
- USR1 => \&noop,
- USR2 => \&noop,
- };
+ my $sig = { CHLD => \&PublicInbox::DS::enqueue_reap };
+ $sig->{$_} = $quit for qw(QUIT INT TERM);
+ $sig->{$_} = \&PublicInbox::Config::noop for qw(HUP USR1 USR2);
# for EVFILT_SIGNAL and signalfd behavioral difference:
my @kq_ign = eval { require PublicInbox::DSKQXS } ? keys(%$sig) : ();
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 11/21] lei: keep signals blocked on daemon shutdown
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (9 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 10/21] lei: reuse PublicInbox::Config::noop Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 12/21] mbox_lock: retry on EINTR and use autodie Eric Wong
` (9 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
Since we completely shut down all workers before exiting,
we no longer have to care about missing SIGCHLD wakeups
during shutdown.
---
lib/PublicInbox/LEI.pm | 13 -------------
1 file changed, 13 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index c9ad46e2..d611f5c3 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1365,9 +1365,6 @@ sub lazy_start {
my $sig = { CHLD => \&PublicInbox::DS::enqueue_reap };
$sig->{$_} = $quit for qw(QUIT INT TERM);
$sig->{$_} = \&PublicInbox::Config::noop for qw(HUP USR1 USR2);
- # for EVFILT_SIGNAL and signalfd behavioral difference:
- my @kq_ign = eval { require PublicInbox::DSKQXS } ? keys(%$sig) : ();
-
require PublicInbox::DirIdle;
local $dir_idle = PublicInbox::DirIdle->new(sub {
# just rely on wakeup to hit post_loop_do
@@ -1390,16 +1387,6 @@ sub lazy_start {
# $daemon pipe to `lei' closed, main loop begins:
eval { PublicInbox::DS::event_loop($sig, $oldset) };
warn "event loop error: $@\n" if $@;
-
- # EVFILT_SIGNAL will get a duplicate of all the signals it was sent
- local @SIG{@kq_ign} = map 'IGNORE', @kq_ign;
- PublicInbox::DS::sig_setmask($oldset) if @kq_ign;
-
- # exit() may trigger waitpid via various DESTROY, ensure interruptible
- local $SIG{TERM} = sub { exit(POSIX::SIGTERM + 128) };
- local $SIG{INT} = sub { exit(POSIX::SIGINT + 128) };
- local $SIG{QUIT} = sub { exit(POSIX::SIGQUIT + 128) };
- PublicInbox::DS::sig_setmask($oldset) if !@kq_ign;
dump_and_clear_log();
exit($exit_code // 0);
}
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 12/21] mbox_lock: retry on EINTR and use autodie
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (10 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 11/21] lei: keep signals blocked on daemon shutdown Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 13/21] lock: retry on EINTR, improve error reporting Eric Wong
` (8 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
EINTR needs to be retried in case signalfd and kevent aren't
available. And autodie makes it easier to focus on more
important stuff.
---
lib/PublicInbox/MboxLock.pm | 49 +++++++++++++++++--------------------
1 file changed, 22 insertions(+), 27 deletions(-)
diff --git a/lib/PublicInbox/MboxLock.pm b/lib/PublicInbox/MboxLock.pm
index 856b1e21..95aa9862 100644
--- a/lib/PublicInbox/MboxLock.pm
+++ b/lib/PublicInbox/MboxLock.pm
@@ -1,15 +1,15 @@
-# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# Various mbox locking methods
package PublicInbox::MboxLock;
-use strict;
-use v5.10.1;
+use v5.12;
use PublicInbox::OnDestroy;
use Fcntl qw(:flock F_SETLK F_SETLKW F_RDLCK F_WRLCK
O_CREAT O_EXCL O_WRONLY SEEK_SET);
use Carp qw(croak);
use PublicInbox::DS qw(now); # ugh...
+use autodie qw(chdir opendir unlink);
our $TMPL = do {
if ($^O eq 'linux') { \'s @32' }
@@ -58,16 +58,13 @@ sub acq_dotlock {
rand(0xffffffff), $pid, time);
if (sysopen(my $fh, $tmp, O_CREAT|O_EXCL|O_WRONLY)) {
if (link($tmp, $dot_lock)) {
- unlink($tmp) or die "unlink($tmp): $!";
+ unlink($tmp);
$self->{".lock$pid"} = $dot_lock;
- if (substr($dot_lock, 0, 1) ne '/') {
- opendir(my $dh, '.') or
- die "opendir . $!";
- $self->{dh} = $dh;
- }
+ substr($dot_lock, 0, 1) eq '/' or
+ opendir($self->{dh}, '.');
return;
}
- unlink($tmp) or die "unlink($tmp): $!";
+ unlink($tmp);
select(undef, undef, undef, $self->{delay});
} else {
croak "open $tmp (for $dot_lock): $!" if !$!{EXIST};
@@ -83,18 +80,20 @@ sub acq_flock {
my $end = now + $self->{timeout};
do {
return if flock($self->{fh}, $op);
- select(undef, undef, undef, $self->{delay});
+ if ($!{EWOULDBLOCK}) {
+ select(undef, undef, undef, $self->{delay});
+ } elsif (!$!{EINTR}) {
+ croak "flock($self->{f} ($self->{fh}): $!";
+ }
} while (now < $end);
die "flock timeout $self->{f}: $!\n";
}
sub acq {
my ($cls, $f, $rw, $methods) = @_;
- my $fh;
- unless (open $fh, $rw ? '+>>' : '<', $f) {
- croak "open($f): $!" if $rw || !$!{ENOENT};
- }
- my $self = bless { f => $f, fh => $fh, rw => $rw }, $cls;
+ my $self = bless { f => $f, rw => $rw }, $cls;
+ my $ok = open $self->{fh}, $rw ? '+>>' : '<', $f;
+ croak "open($f): $!" if !$ok && ($rw || !$!{ENOENT});
my $m = "@$methods";
if ($m ne 'none') {
my @m = map {
@@ -116,20 +115,16 @@ sub acq {
$self;
}
-sub _fchdir { chdir($_[0]) } # OnDestroy callback
-
sub DESTROY {
my ($self) = @_;
- if (my $f = $self->{".lock$$"}) {
- my $x;
- if (my $dh = delete $self->{dh}) {
- opendir my $c, '.' or die "opendir . $!";
- $x = PublicInbox::OnDestroy->new(\&_fchdir, $c);
- chdir($dh) or die "chdir (for $f): $!";
- }
- unlink($f) or die "unlink($f): $! (lock stolen?)";
- undef $x;
+ my $f = $self->{".lock$$"} or return;
+ my $x;
+ if (my $dh = delete $self->{dh}) {
+ opendir my $c, '.';
+ $x = PublicInbox::OnDestroy->new(\&chdir, $c);
+ chdir($dh);
}
+ CORE::unlink($f) or die "unlink($f): $! (lock stolen?)";
}
1;
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 13/21] lock: retry on EINTR, improve error reporting
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (11 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 12/21] mbox_lock: retry on EINTR and use autodie Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 14/21] treewide: use PublicInbox::Lock->new Eric Wong
` (7 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
We'll also add a handy ->new function since there's
a bunch of places we just create objects with bless.
---
lib/PublicInbox/Lock.pm | 52 +++++++++++++++++++++++------------------
1 file changed, 29 insertions(+), 23 deletions(-)
diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm
index 0ee2a8bd..ddaf3312 100644
--- a/lib/PublicInbox/Lock.pm
+++ b/lib/PublicInbox/Lock.pm
@@ -1,36 +1,42 @@
-# Copyright (C) 2018-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-# Base class for per-inbox locking
+# Base class for per-inbox locking, subclassed by several
+# only uses {lock_path} and {lockfh} fields
package PublicInbox::Lock;
-use strict;
-use v5.10.1;
-use Fcntl qw(:flock :DEFAULT);
+use v5.12;
+use Fcntl qw(LOCK_UN LOCK_EX O_RDWR O_CREAT);
use Carp qw(croak);
use PublicInbox::OnDestroy;
+use Errno qw(EINTR);
+use autodie qw(close sysopen syswrite);
+
+sub xflock ($$) {
+ until (flock($_[0], $_[1])) { return if $! != EINTR }
+ 1;
+}
+
+sub new { bless { lock_path => $_[1] }, $_[0] }
# we only acquire the flock if creating or reindexing;
# PublicInbox::Import already has the lock on its own.
sub lock_acquire {
my ($self) = @_;
- my $lock_path = $self->{lock_path};
- croak 'already locked '.($lock_path // '(undef)') if $self->{lockfh};
- return unless defined($lock_path);
- sysopen(my $lockfh, $lock_path, O_RDWR|O_CREAT) or
- croak "failed to open $lock_path: $!\n";
- flock($lockfh, LOCK_EX) or croak "lock $lock_path failed: $!\n";
- $self->{lockfh} = $lockfh;
+ my $fn = $self->{lock_path};
+ croak 'already locked '.($fn // '(undef)') if $self->{lockfh};
+ $fn // return;
+ sysopen(my $fh, $fn, O_RDWR|O_CREAT);
+ xflock($fh, LOCK_EX) or croak "LOCK_EX $fn: $!";
+ $self->{lockfh} = $fh;
}
sub lock_release {
my ($self, $wake) = @_;
- defined(my $lock_path = $self->{lock_path}) or return;
- my $lockfh = delete $self->{lockfh} or croak "not locked: $lock_path";
-
- syswrite($lockfh, '.') if $wake;
-
- flock($lockfh, LOCK_UN) or croak "unlock $lock_path failed: $!\n";
- close $lockfh or croak "close $lock_path failed: $!\n";
+ my $fn = $self->{lock_path} // return;
+ my $fh = delete $self->{lockfh} or croak "not locked: $fn";
+ syswrite($fh, '.') if $wake;
+ xflock($fh, LOCK_UN) or croak "LOCK_UN $fn: $!";
+ close $fh; # may detect errors
}
# caller must use return value
@@ -41,13 +47,13 @@ sub lock_for_scope {
}
sub lock_acquire_fast {
- $_[0]->{lockfh} or return lock_acquire($_[0]);
- flock($_[0]->{lockfh}, LOCK_EX) or croak "lock (fast) failed: $!";
+ my $fh = $_[0]->{lockfh} or return lock_acquire($_[0]);
+ xflock($fh, LOCK_EX) or croak "LOCK_EX $_[0]->{lock_path}: $!";
}
sub lock_release_fast {
- flock($_[0]->{lockfh} // return, LOCK_UN) or
- croak "unlock (fast) $_[0]->{lock_path}: $!";
+ xflock($_[0]->{lockfh} // return, LOCK_UN) or
+ croak "LOCK_UN $_[0]->{lock_path}: $!"
}
# caller must use return value
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 14/21] treewide: use PublicInbox::Lock->new
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (12 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 13/21] lock: retry on EINTR, improve error reporting Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 15/21] gcf2: use PublicInbox::Lock Eric Wong
` (6 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
This gets rid of a few bare bless statements and helps
ensure we properly load Lock.pm before using it.
---
lib/PublicInbox/LEI.pm | 2 +-
lib/PublicInbox/LeiMirror.pm | 2 +-
lib/PublicInbox/TestCommon.pm | 7 +++----
t/solver_git.t | 3 ++-
t/v2mirror.t | 2 +-
5 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index d611f5c3..a5a6d321 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1311,7 +1311,7 @@ sub lazy_start {
my ($sock_dir) = ($path =~ m!\A(.+?)/[^/]+\z!);
$errors_log = "$sock_dir/errors.log";
my $addr = pack_sockaddr_un($path);
- my $lk = bless { lock_path => $errors_log }, 'PublicInbox::Lock';
+ my $lk = PublicInbox::Lock->new($errors_log);
umask(077) // die("umask(077): $!");
$lk->lock_acquire;
socket($listener, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index c99bafc3..9d8a8963 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -873,7 +873,7 @@ sub v2_done { # called via OnDestroy
return if $self->{dry_run} || !keep_going($self);
my $dst = $self->{cur_dst} // $self->{dst};
require PublicInbox::Lock;
- my $lk = bless { lock_path => "$dst/inbox.lock" }, 'PublicInbox::Lock';
+ my $lk = PublicInbox::Lock->new("$dst/inbox.lock");
my $lck = $lk->lock_for_scope($$);
_write_inbox_config($self);
require PublicInbox::MultiGit;
diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm
index 7d0eb2c4..32213fde 100644
--- a/lib/PublicInbox/TestCommon.pm
+++ b/lib/PublicInbox/TestCommon.pm
@@ -742,8 +742,7 @@ sub setup_public_inboxes () {
return @ret if -f $stamp;
require PublicInbox::Lock;
- my $lk = bless { lock_path => "$test_home/setup.lock" },
- 'PublicInbox::Lock';
+ my $lk = PublicInbox::Lock->new("$test_home/setup.lock");
my $end = $lk->lock_for_scope;
return @ret if -f $stamp;
@@ -798,7 +797,7 @@ sub create_coderepo ($$;@) {
my $err = $!;
-d $dir or xbail "mkdir($dir): $err";
}
- my $lk = bless { lock_path => "$dir/creat.lock" }, 'PublicInbox::Lock';
+ my $lk = PublicInbox::Lock->new("$dir/creat.lock");
my $scope = $lk->lock_for_scope;
my $tmpdir = delete $opt{tmpdir};
if (!-f "$dir/creat.stamp") {
@@ -830,7 +829,7 @@ sub create_inbox ($$;@) {
my $err = $!;
-d $dir or xbail "mkdir($dir): $err";
}
- my $lk = bless { lock_path => "$dir/creat.lock" }, 'PublicInbox::Lock';
+ my $lk = PublicInbox::Lock->new("$dir/creat.lock");
$opt{inboxdir} = File::Spec->rel2abs($dir);
$opt{name} //= $ident;
my $scope = $lk->lock_for_scope;
diff --git a/t/solver_git.t b/t/solver_git.t
index f8cafa5e..4f09e05b 100644
--- a/t/solver_git.t
+++ b/t/solver_git.t
@@ -223,7 +223,8 @@ SKIP: {
};
my %bin = (big => $big_size, small => 1);
my %oid; # (small|big) => OID
- my $lk = bless { lock_path => $l }, 'PublicInbox::Lock';
+ require PublicInbox::Lock;
+ my $lk = PublicInbox::Lock->new($l);
my $acq = $lk->lock_for_scope;
my $stamp = "$binfoo/stamp-";
if (open my $fh, '<', $stamp) {
diff --git a/t/v2mirror.t b/t/v2mirror.t
index 88b67bc1..b8824182 100644
--- a/t/v2mirror.t
+++ b/t/v2mirror.t
@@ -335,7 +335,7 @@ SKIP: {
chomp $oldrev;
my ($base) = ($0 =~ m!\b([^/]+)\.[^\.]+\z!);
my $wt = "t/data-gen/$base.pre-manifest-$oldrev";
- my $lk = bless { lock_path => __FILE__ }, 'PublicInbox::Lock';
+ my $lk = PublicInbox::Lock->new(__FILE__);
$lk->lock_acquire;
my $psgi = "$wt/app.psgi";
if (!-f $psgi) { # checkout a pre-manifest.js.gz version
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 15/21] gcf2: use PublicInbox::Lock
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (13 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 14/21] treewide: use PublicInbox::Lock->new Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 16/21] spawn: use autodie and PublicInbox::Lock Eric Wong
` (5 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
It auto-retries on EINTR and saves us the trouble of doing so.
---
lib/PublicInbox/Gcf2.pm | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/Gcf2.pm b/lib/PublicInbox/Gcf2.pm
index 0f4d2bf0..37262e28 100644
--- a/lib/PublicInbox/Gcf2.pm
+++ b/lib/PublicInbox/Gcf2.pm
@@ -6,10 +6,11 @@
package PublicInbox::Gcf2;
use v5.12;
use PublicInbox::Spawn qw(which popen_rd); # may set PERL_INLINE_DIRECTORY
-use Fcntl qw(LOCK_EX SEEK_SET);
+use Fcntl qw(SEEK_SET);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
use IO::Handle; # autoflush
use PublicInbox::Git;
+use PublicInbox::Lock;
BEGIN {
use autodie;
@@ -18,10 +19,10 @@ BEGIN {
# to ~/.cache/public-inbox/inline-c if it exists and Inline::C works
my $inline_dir = $ENV{PERL_INLINE_DIRECTORY} //
die 'PERL_INLINE_DIRECTORY not defined';
- open my $fh, '+>', "$inline_dir/.public-inbox.lock";
# CentOS 7.x ships Inline 0.53, 0.64+ has built-in locking
- flock($fh, LOCK_EX);
+ my $lk = PublicInbox::Lock->new("$inline_dir/.public-inbox.lock");
+ my $fh = $lk->lock_acquire;
my $pc = which($ENV{PKG_CONFIG} // 'pkg-config') //
die "pkg-config missing for libgit2";
@@ -74,6 +75,7 @@ EOM
if ($err) {
seek($fh, 0, SEEK_SET);
my @msg = <$fh>;
+ truncate($fh, 0);
die "Inline::C Gcf2 build failed:\n", $err, "\n", @msg;
}
}
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 16/21] spawn: use autodie and PublicInbox::Lock
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (14 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 15/21] gcf2: use PublicInbox::Lock Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 17/21] xap_helper: retry flock on EINTR Eric Wong
` (4 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
It keeps Spawn.pm less noisy and ensures retries on EINTR.
---
lib/PublicInbox/Spawn.pm | 30 +++++++++++++++---------------
1 file changed, 15 insertions(+), 15 deletions(-)
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 75ef0137..0dffe064 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -17,7 +17,8 @@
package PublicInbox::Spawn;
use v5.12;
use parent qw(Exporter);
-use Fcntl qw(LOCK_EX SEEK_SET);
+use PublicInbox::Lock;
+use Fcntl qw(SEEK_SET);
use IO::Handle ();
use Carp qw(croak);
use PublicInbox::ProcessPipe;
@@ -285,26 +286,25 @@ ALL_LIBC
$all_libc = undef unless -d _ && -w _;
if (defined $all_libc) {
local $ENV{PERL_INLINE_DIRECTORY} = $inline_dir;
- my $f = "$inline_dir/.public-inbox.lock";
- open my $oldout, '>&', \*STDOUT or die "dup(1): $!";
- open my $olderr, '>&', \*STDERR or die "dup(2): $!";
- open my $fh, '+>', $f or die "open($f): $!";
- open STDOUT, '>&', $fh or die "1>$f: $!";
- open STDERR, '>&', $fh or die "2>$f: $!";
+ use autodie;
+ # CentOS 7.x ships Inline 0.53, 0.64+ has built-in locking
+ my $lk = PublicInbox::Lock->new($inline_dir.
+ '/.public-inbox.lock');
+ my $fh = $lk->lock_acquire;
+ open my $oldout, '>&', \*STDOUT;
+ open my $olderr, '>&', \*STDERR;
+ open STDOUT, '>&', $fh;
+ open STDERR, '>&', $fh;
STDERR->autoflush(1);
STDOUT->autoflush(1);
-
- # CentOS 7.x ships Inline 0.53, 0.64+ has built-in locking
- flock($fh, LOCK_EX) or die "LOCK_EX($f): $!";
- eval <<'EOM';
-use Inline C => $all_libc, BUILD_NOISY => 1;
-EOM
+ CORE::eval 'use Inline C => $all_libc, BUILD_NOISY => 1';
my $err = $@;
- open(STDERR, '>&', $olderr) or warn "restore stderr: $!";
- open(STDOUT, '>&', $oldout) or warn "restore stdout: $!";
+ open(STDERR, '>&', $olderr);
+ open(STDOUT, '>&', $oldout);
if ($err) {
seek($fh, 0, SEEK_SET);
my @msg = <$fh>;
+ truncate($fh, 0);
warn "Inline::C build failed:\n", $err, "\n", @msg;
$all_libc = undef;
}
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 17/21] xap_helper: retry flock on EINTR
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (15 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 16/21] spawn: use autodie and PublicInbox::Lock Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 18/21] xap_helper.pm: use EINTR-aware recv_cmd Eric Wong
` (3 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
While signals are currently blocked in these helpers,
they may not always be...
---
lib/PublicInbox/XapHelper.pm | 4 ++--
lib/PublicInbox/xap_helper.h | 6 ++++--
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
index 8c2b86d6..f90b283d 100644
--- a/lib/PublicInbox/XapHelper.pm
+++ b/lib/PublicInbox/XapHelper.pm
@@ -109,9 +109,9 @@ sub dump_roots_iter ($$$) {
sub dump_roots_flush ($$) {
my ($req, $fh) = @_;
if ($req->{wbuf} ne '') {
- flock($fh, LOCK_EX) or die "flock: $!";
+ until (flock($fh, LOCK_EX)) { die "LOCK_EX: $!" if !$!{EINTR} }
print { $req->{0} } $req->{wbuf} or die "print: $!";
- flock($fh, LOCK_UN) or die "flock: $!";
+ until (flock($fh, LOCK_UN)) { die "LOCK_UN: $!" if !$!{EINTR} }
$req->{wbuf} = '';
}
}
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index 5f04316c..a78a3f76 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -441,7 +441,8 @@ static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt)
}
drt->wbuf.fp = NULL;
if (!drt->wbuf.len) goto done_free;
- if (flock(drt->root2id_fd, LOCK_EX)) {
+ while (flock(drt->root2id_fd, LOCK_EX)) {
+ if (errno == EINTR) continue;
perror("LOCK_EX");
return false;
}
@@ -456,7 +457,8 @@ static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt)
return false;
}
} while (drt->wbuf.len);
- if (flock(drt->root2id_fd, LOCK_UN)) {
+ while (flock(drt->root2id_fd, LOCK_UN)) {
+ if (errno == EINTR) continue;
perror("LOCK_UN");
return false;
}
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 18/21] xap_helper.pm: use EINTR-aware recv_cmd
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (16 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 17/21] xap_helper: retry flock on EINTR Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 19/21] spawn: drop checks for directory writability Eric Wong
` (2 subsequent siblings)
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
The code is already loaded, so there's no point in avoiding it.
---
lib/PublicInbox/XapHelper.pm | 7 ++-----
1 file changed, 2 insertions(+), 5 deletions(-)
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
index f90b283d..c98708e3 100644
--- a/lib/PublicInbox/XapHelper.pm
+++ b/lib/PublicInbox/XapHelper.pm
@@ -177,12 +177,9 @@ sub recv_loop {
my $in = \*STDIN;
while (!defined($parent_pid) || getppid == $parent_pid) {
PublicInbox::DS::sig_setmask($workerset);
- my @fds = $PublicInbox::IPC::recv_cmd->($in, $rbuf, 4096*33);
+ my @fds = PublicInbox::IPC::recv_cmd($in, $rbuf, 4096*33);
scalar(@fds) or exit(66); # EX_NOINPUT
- if (!defined($fds[0])) {
- next if $!{EINTR};
- die "recvmsg: $!";
- }
+ die "recvmsg: $!" if !defined($fds[0]);
PublicInbox::DS::block_signals();
my $req = bless {}, __PACKAGE__;
my $i = 0;
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 19/21] spawn: drop checks for directory writability
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (17 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 18/21] xap_helper.pm: use EINTR-aware recv_cmd Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 20/21] lei: document and local-ize $OPT hashref Eric Wong
2023-10-04 3:49 ` [PATCH 21/21] searchidx: fix redundant `in' in warning message Eric Wong
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
It's a TOUTTOC bug to do stat or access checks, anyways,
so just use the file and let autodie::sysopen PublicInbox::Lock
take care of the rest.
---
lib/PublicInbox/Spawn.pm | 2 --
1 file changed, 2 deletions(-)
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 0dffe064..bb2abe28 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -282,8 +282,6 @@ ALL_LIBC
$ENV{XDG_CACHE_HOME} //
( ($ENV{HOME} // '/nonexistent').'/.cache' )
).'/public-inbox/inline-c';
- warn "$inline_dir exists, not writable\n" if -e $inline_dir && !-w _;
- $all_libc = undef unless -d _ && -w _;
if (defined $all_libc) {
local $ENV{PERL_INLINE_DIRECTORY} = $inline_dir;
use autodie;
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 20/21] lei: document and local-ize $OPT hashref
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (18 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 19/21] spawn: drop checks for directory writability Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
2023-10-04 3:49 ` [PATCH 21/21] searchidx: fix redundant `in' in warning message Eric Wong
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
This variable needs to be visible to a callback running inside
Getopt::Long, but we don't need to keep it around after
LEI->optparse runs.
---
lib/PublicInbox/LEI.pm | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index a5a6d321..5f3147bf 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -37,15 +37,16 @@ $GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
my $GLP_PASS = Getopt::Long::Parser->new;
$GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through));
-our %PATH2CFG; # persistent for socket daemon
-our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => [ ino watches ] }
+our (%PATH2CFG, # persistent for socket daemon
+$MDIR2CFGPATH, # /path/to/maildir => { /path/to/config => [ ino watches ] }
+$OPT, # shared between optparse and opt_dash callback (for Getopt::Long)
+);
# TBD: this is a documentation mechanism to show a subcommand
# (may) pass options through to another command:
sub pass_through { $GLP_PASS }
-my $OPT;
-sub opt_dash ($$) {
+sub opt_dash ($$) { # callback runs inside optparse
my ($spec, $re_str) = @_; # 'limit|n=i', '([0-9]+)'
my ($key) = ($spec =~ m/\A([a-z]+)/g);
my $cb = sub { # Getopt::Long "<>" catch-all handler
@@ -691,7 +692,7 @@ sub optparse ($$$) {
# allow _complete --help to complete, not show help
return 1 if substr($cmd, 0, 1) eq '_';
$self->{cmd} = $cmd;
- $OPT = $self->{opt} //= {};
+ local $OPT = $self->{opt} //= {};
my $info = $CMD{$cmd} // [ '[...]' ];
my ($proto, undef, @spec) = @$info;
my $glp = ref($spec[-1]) eq ref($GLP) ? pop(@spec) : $GLP;
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [PATCH 21/21] searchidx: fix redundant `in' in warning message
2023-10-04 3:49 [PATCH 00/21] lei + IPC related stuff Eric Wong
` (19 preceding siblings ...)
2023-10-04 3:49 ` [PATCH 20/21] lei: document and local-ize $OPT hashref Eric Wong
@ 2023-10-04 3:49 ` Eric Wong
20 siblings, 0 replies; 22+ messages in thread
From: Eric Wong @ 2023-10-04 3:49 UTC (permalink / raw)
To: meta
Fortunately, I've never actually seen that message...
---
lib/PublicInbox/SearchIdx.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 2a0e06d1..8a571cfb 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -709,7 +709,7 @@ sub xdb_remove {
my $xdb = $self->{xdb} // die 'BUG: missing {xdb}';
for my $docid (@docids) {
eval { $xdb->delete_document($docid) };
- warn "E: #$docid not in in Xapian? $@\n" if $@;
+ warn "E: #$docid not in Xapian? $@\n" if $@;
}
}
^ permalink raw reply related [flat|nested] 22+ messages in thread