I'm not sure if I want to keep 1/7. 4/7 is LONG overdue Still chasing down difficult-to-reproduce lei2mail workers segfaults which seem related to LeiDedupe + SharedKV and weird object lifetimes; which is preventing me from doing anything else. Worst case is we disable worker processes, but the performance hit sucks. Eric Wong (7): ipc: wq: support passing fields to workers lei_xsearch: drop repeated "Xapian" in error message ipc: more consistent behavior between worker types lei: less error-prone FD mapping git: synchronous cat_file may return type and OID ipc: move on_destroy scope to inside the eval shared_kv: simplify PID+object guard for cleanup lib/PublicInbox/Git.pm | 9 ++--- lib/PublicInbox/IPC.pm | 46 +++++++++++++--------- lib/PublicInbox/LEI.pm | 56 ++++++++++++++++++++------- lib/PublicInbox/LeiOverview.pm | 9 ++--- lib/PublicInbox/LeiToMail.pm | 8 +--- lib/PublicInbox/LeiXSearch.pm | 70 +++++++++++++++------------------- lib/PublicInbox/SharedKV.pm | 8 ++-- lib/PublicInbox/Spawn.pm | 2 +- t/git.t | 8 ++-- t/shared_kv.t | 2 +- 10 files changed, 119 insertions(+), 99 deletions(-)
This will be useful for pre-sharing certain file handles. --- lib/PublicInbox/IPC.pm | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index c52441f7..838f9530 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -311,14 +311,15 @@ sub wq_do { # always async } } -sub _wq_worker_start ($$) { - my ($self, $oldset) = @_; +sub _wq_worker_start ($$$) { + my ($self, $oldset, $fields) = @_; my $seed = rand(0xffffffff); my $pid = fork // die "fork: $!"; if ($pid == 0) { srand($seed); eval { PublicInbox::DS->Reset }; delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)}; + @$self{keys %$fields} = values(%$fields) if $fields; $SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN)); $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD)); local $0 = $self->{-wq_ident}; @@ -337,7 +338,7 @@ sub _wq_worker_start ($$) { # starts workqueue workers if Sereal or Storable is installed sub wq_workers_start { - my ($self, $ident, $nr_workers, $oldset) = @_; + my ($self, $ident, $nr_workers, $oldset, $fields) = @_; ($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return; return if $self->{-wq_s1}; # idempotent $self->{-wq_s1} = $self->{-wq_s2} = undef; @@ -349,18 +350,18 @@ sub wq_workers_start { my $sigset = $oldset // PublicInbox::DS::block_signals(); $self->{-wq_workers} = {}; $self->{-wq_ident} = $ident; - _wq_worker_start($self, $sigset) for (1..$nr_workers); + _wq_worker_start($self, $sigset, $fields) for (1..$nr_workers); PublicInbox::DS::sig_setmask($sigset) unless $oldset; $self->{-wq_ppid} = $$; } sub wq_worker_incr { # SIGTTIN handler - my ($self, $oldset) = @_; + my ($self, $oldset, $fields) = @_; $self->{-wq_s2} or return; return if wq_workers($self) >= $WQ_MAX_WORKERS; $self->ipc_atfork_prepare; my $sigset = $oldset // PublicInbox::DS::block_signals(); - _wq_worker_start($self, $sigset); + _wq_worker_start($self, $sigset, $fields); PublicInbox::DS::sig_setmask($sigset) unless $oldset; }
Copy+paste error :x --- lib/PublicInbox/LeiXSearch.pm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index b470c113..9ea2b5f3 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -33,7 +33,7 @@ sub attach_external { my $srch = $ibxish->search or return warn("$desc not indexed for Xapian\n"); my @shards = $srch->xdb_shards_flat or - return warn("$desc has no Xapian shardsXapian\n"); + return warn("$desc has no Xapian shards\n"); if (delete $self->{xdb}) { # XXX: do we need this? # clobber existing {xdb} if amending
Localize signals inside the respective worker loops in case there's circular references. We'll also rely on OnDestroy to trigger exits from the ipc_worker_loop like we do with wq_worker_loop. And also add some more developer documentation to help future developers. The default signals remain different, for now. Cleanup some unnecessary "use" statements while we're loading OnDestroy. --- lib/PublicInbox/IPC.pm | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 838f9530..ece0e8b8 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -2,16 +2,20 @@ # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # base class for remote IPC calls and workqueues, requires Storable or Sereal +# - ipc_do and ipc_worker_* is for a single worker/producer and uses pipes +# - wq_do and wq_worker* is for a single producer and multiple workers, +# using SOCK_SEQPACKET for work distribution +# use ipc_do when you need work done on a certain process +# use wq_do when your work can be done on any idle worker package PublicInbox::IPC; use strict; use v5.10.1; use Carp qw(confess croak); use PublicInbox::DS qw(dwaitpid); use PublicInbox::Spawn; -use POSIX qw(mkfifo WNOHANG); +use PublicInbox::OnDestroy; use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM); use Errno qw(EMSGSIZE); -use File::Temp 0.19 (); # 0.19 for ->newdir my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF(); my $WQ_MAX_WORKERS = 4096; @@ -107,16 +111,22 @@ sub ipc_worker_spawn { if ($pid == 0) { srand($seed); eval { PublicInbox::DS->Reset }; - delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)}; + delete @$self{qw(-wq_s1 -wq_s2 -wq_workers -wq_ppid)}; $w_req = $r_res = undef; $w_res->autoflush(1); $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); local $0 = $ident; PublicInbox::DS::sig_setmask($sigset); + # ensure we properly exit even if warn() dies: + my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); my $on_destroy = $self->ipc_atfork_child; - eval { ipc_worker_loop($self, $r_req, $w_res) }; + eval { + local %SIG = %SIG; + ipc_worker_loop($self, $r_req, $w_res); + }; die "worker $ident PID:$$ died: $@\n" if $@; - exit; + undef $on_destroy; + undef $end; # trigger exit } PublicInbox::DS::sig_setmask($sigset) unless $oldset; $r_req = $w_res = undef; @@ -320,14 +330,17 @@ sub _wq_worker_start ($$$) { eval { PublicInbox::DS->Reset }; delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)}; @$self{keys %$fields} = values(%$fields) if $fields; - $SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN)); - $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD)); + $SIG{$_} = 'IGNORE' for (qw(PIPE)); + $SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD)); local $0 = $self->{-wq_ident}; PublicInbox::DS::sig_setmask($oldset); # ensure we properly exit even if warn() dies: my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); my $on_destroy = $self->ipc_atfork_child; - eval { wq_worker_loop($self) }; + eval { + local %SIG = %SIG; + wq_worker_loop($self); + }; warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@; undef $on_destroy; undef $end; # trigger exit
Keeping track of non-standard FDs gets tricky, so make it easier by relying on st_dev/st_ino mapping in the transmitted objects. We'll keep using numbers for the standard FDs since we need to be able to easily redirect them in the producer (main daemon) process for (gzip|bzip2|xz) if writing to a compressed mbox. --- lib/PublicInbox/LEI.pm | 56 +++++++++++++++++++++------- lib/PublicInbox/LeiOverview.pm | 9 ++--- lib/PublicInbox/LeiToMail.pm | 8 +--- lib/PublicInbox/LeiXSearch.pm | 68 +++++++++++++++------------------- lib/PublicInbox/Spawn.pm | 2 +- 5 files changed, 77 insertions(+), 66 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index f5413aab..3ed330f9 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -335,14 +335,27 @@ sub atfork_prepare_wq { } } +sub io_restore ($$) { + my ($dst, $src) = @_; + for my $i (0..2) { # standard FDs + my $io = delete $src->{$i} or next; + $dst->{$i} = $io; + } + for my $i (3..9) { # named (non-standard) FDs + my $io = $src->{$i} or next; + my @st = stat($io) or die "stat $src.$i ($io): $!"; + my $f = delete $dst->{"dev=$st[0],ino=$st[1]"} // next; + $dst->{$f} = $io; + delete $src->{$i}; + } +} + # usage: my %sig = $lei->atfork_child_wq($wq); # local @SIG{keys %sig} = values %sig; sub atfork_child_wq { my ($self, $wq) = @_; - my ($sock, $l2m_wq_s1); - (@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4}); - $self->{sock} = $sock if -S $sock; - $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1 && -S $l2m_wq_s1; + io_restore($self, $wq); + io_restore($self->{l2m}, $wq); %PATH2CFG = (); undef $errors_log; $quit = \&CORE::exit; @@ -355,30 +368,45 @@ sub atfork_child_wq { close(delete $self->{$i}); } # trigger the LeiXSearch $done OpPipe: - syswrite($self->{0}, '!') if $self->{0} && -p $self->{0}; + syswrite($self->{op_pipe}, '!') if $self->{op_pipe}; $SIG{PIPE} = 'DEFAULT'; die bless(\"$_[0]", 'PublicInbox::SIGPIPE'), }); } +sub io_extract ($;@) { + my ($obj, @fields) = @_; + my @io; + for my $f (@fields) { + my $io = delete $obj->{$f} or next; + my @st = stat($io) or die "W: stat $obj.$f ($io): $!"; + $obj->{"dev=$st[0],ino=$st[1]"} = $f; + push @io, $io; + } + @io +} + # usage: ($lei, @io) = $lei->atfork_parent_wq($wq); sub atfork_parent_wq { my ($self, $wq) = @_; my $env = delete $self->{env}; # env is inherited at fork - my $ret = bless { %$self }, ref($self); - if (my $dedupe = delete $ret->{dedupe}) { - $ret->{dedupe} = $wq->deep_clone($dedupe); + my $lei = bless { %$self }, ref($self); + if (my $dedupe = delete $lei->{dedupe}) { + $lei->{dedupe} = $wq->deep_clone($dedupe); } $self->{env} = $env; - delete @$ret{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m - my @io = delete @$ret{0..2}; - $io[3] = delete($ret->{sock}) // $io[2]; - my $l2m = $ret->{l2m}; + delete @$lei{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m + my @io = (delete(@$lei{qw(0 1 2)}), + io_extract($lei, qw(sock op_pipe startq))); + my $l2m = $lei->{l2m}; if ($l2m && $l2m != $wq) { # $wq == lxs - $io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1}; + if (my $wq_s1 = $l2m->{-wq_s1}) { + push @io, io_extract($l2m, '-wq_s1'); + $l2m->{-wq_s1} = $wq_s1; + } $l2m->wq_close(1); } - ($ret, @io); + ($lei, @io); } sub _help ($;$) { diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index f9a28138..c67e2747 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -220,14 +220,13 @@ sub ovv_each_smsg_cb { # runs in wq worker usually }; } elsif ($l2m && $l2m->{-wq_s1}) { my ($lei_ipc, @io) = $lei->atfork_parent_wq($l2m); - # n.b. $io[0] = qry_status_wr, $io[1] = mbox|stdout, - # $io[4] becomes a notification pipe that triggers EOF + # $io[-1] becomes a notification pipe that triggers EOF # in this wq worker when all outstanding ->write_mail # calls are complete - die "BUG: \$io[4] $io[4] unexpected" if $io[4]; - pipe($l2m->{each_smsg_done}, $io[4]) or die "pipe: $!"; - fcntl($io[4], 1031, 4096) if $^O eq 'linux'; + pipe($l2m->{each_smsg_done}, $io[$#io + 1]) or die "pipe: $!"; + fcntl($io[-1], 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ delete @$lei_ipc{qw(l2m opt mset_opt cmd)}; + $lei_ipc->{each_smsg_not_done} = $#io; my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git $self->{git} = $git; my $git_dir = $git->{git_dir}; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 08a1570d..61b546b5 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -460,7 +460,7 @@ sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon sub write_mail { # via ->wq_do my ($self, $git_dir, $smsg, $lei) = @_; - my $not_done = delete $self->{4}; # write end of {each_smsg_done} + my $not_done = delete $self->{$lei->{each_smsg_not_done}}; my $wcb = $self->{wcb} //= do { # first message my %sig = $lei->atfork_child_wq($self); @SIG{keys %sig} = values %sig; # not local @@ -471,12 +471,6 @@ sub write_mail { # via ->wq_do $git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]); } -sub ipc_atfork_prepare { - my ($self) = @_; - # FDs: (done_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr) - $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC -} - # We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY # ordering is unstable at worker exit and may cause segfaults sub reap_gits { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 9ea2b5f3..e69b637c 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -109,9 +109,9 @@ sub wait_startq ($) { sub query_thread_mset { # for --thread my ($self, $lei, $ibxish) = @_; local $0 = "$0 query_thread_mset"; - my $startq = delete $self->{5}; my %sig = $lei->atfork_child_wq($self); local @SIG{keys %sig} = values %sig; + my $startq = delete $lei->{startq}; my ($srch, $over) = ($ibxish->search, $ibxish->over); unless ($srch && $over) { @@ -145,9 +145,9 @@ sub query_thread_mset { # for --thread sub query_mset { # non-parallel for non-"--thread" users my ($self, $lei) = @_; local $0 = "$0 query_mset"; - my $startq = delete $self->{5}; my %sig = $lei->atfork_child_wq($self); local @SIG{keys %sig} = values %sig; + my $startq = delete $lei->{startq}; my $mo = { %{$lei->{mset_opt}} }; my $mset; for my $loc (locals($self)) { @@ -173,7 +173,7 @@ sub each_eml { # callback for MboxReader->mboxrd $smsg->parse_references($eml, mids($eml)); $smsg->{$_} //= '' for qw(from to cc ds subject references mid); delete @$smsg{qw(From Subject -ds -ts)}; - if (my $startq = delete($self->{5})) { wait_startq($startq) } + if (my $startq = delete($lei->{startq})) { wait_startq($startq) } $each_smsg->($smsg, undef, $eml); } @@ -352,11 +352,12 @@ sub query_prepare { # called by wq_do my ($self, $lei) = @_; local $0 = "$0 query_prepare"; my %sig = $lei->atfork_child_wq($self); - -p $lei->{0} or die "BUG: \$done pipe expected"; + -p $lei->{op_pipe} or die "BUG: \$done pipe expected"; local @SIG{keys %sig} = values %sig; + delete $lei->{l2m}->{-wq_s1}; eval { $lei->{l2m}->do_augment($lei) }; $lei->fail($@) if $@; - syswrite($lei->{0}, '.') == 1 or die "do_post_augment trigger: $!"; + syswrite($lei->{op_pipe}, '.') == 1 or die "do_post_augment trigger: $!" } sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers @@ -370,56 +371,45 @@ sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers } sub do_query { - my ($self, $lei_orig) = @_; - my ($lei, @io) = $lei_orig->atfork_parent_wq($self); - $io[0] = undef; - pipe(my $done, $io[0]) or die "pipe $!"; - $lei_orig->{1}->autoflush(1); + my ($self, $lei) = @_; + $lei->{1}->autoflush(1); + my ($au_done, $zpipe); + my $l2m = $lei->{l2m}; + if ($l2m) { + pipe($lei->{startq}, $au_done) or die "pipe: $!"; + # 1031: F_SETPIPE_SZ + fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux'; + $zpipe = $l2m->pre_augment($lei); + } + pipe(my $done, $lei->{op_pipe}) or die "pipe $!"; + my ($lei_ipc, @io) = $lei->atfork_parent_wq($self); + delete($lei->{op_pipe}); - $lei_orig->event_step_init; # wait for shutdowns + $lei->event_step_init; # wait for shutdowns my $done_op = { - '' => [ \&query_done, $lei_orig ], - '!' => [ \&sigpipe_handler, $lei_orig ] + '' => [ \&query_done, $lei ], + '!' => [ \&sigpipe_handler, $lei ] }; - my $in_loop = exists $lei_orig->{sock}; + my $in_loop = exists $lei->{sock}; $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop); - my $l2m = $lei->{l2m}; if ($l2m) { - # may redirect $lei->{1} for mbox - my $zpipe = $l2m->pre_augment($lei_orig); - $io[1] = $lei_orig->{1}; - pipe(my ($startq, $au_done)) or die "pipe: $!"; - $done_op->{'.'} = [ \&do_post_augment, $lei_orig, - $zpipe, $au_done ]; - local $io[4] = *STDERR{GLOB}; # don't send l2m->{-wq_s1} - die "BUG: unexpected \$io[5]: $io[5]" if $io[5]; - $self->wq_do('query_prepare', \@io, $lei); - fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ - $io[5] = $startq; + $done_op->{'.'} = [ \&do_post_augment, $lei, $zpipe, $au_done ]; + $self->wq_do('query_prepare', \@io, $lei_ipc); $io[1] = $zpipe->[1] if $zpipe; } - start_query($self, \@io, $lei); + start_query($self, \@io, $lei_ipc); $self->wq_close(1); unless ($in_loop) { - # for the $lei->atfork_child_wq PIPE handler: + # for the $lei_ipc->atfork_child_wq PIPE handler: while ($done->{sock}) { $done->event_step } } } -sub ipc_atfork_prepare { - my ($self) = @_; - if (exists $self->{remotes}) { - require PublicInbox::MboxReader; - require IO::Uncompress::Gunzip; - } - # FDS: (0: done_wr, 1: stdout|mbox, 2: stderr, - # 3: sock, 4: $l2m->{-wq_s1}, 5: $startq) - $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC -} - sub add_uri { my ($self, $uri) = @_; if (my $curl = $self->{curl} //= which('curl') // 0) { + require PublicInbox::MboxReader; + require IO::Uncompress::Gunzip; push @{$self->{remotes}}, $uri; } else { warn "curl missing, ignoring $uri\n"; diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index ef4885c1..1842899c 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS'; #include <sys/socket.h> #if defined(CMSG_SPACE) && defined(CMSG_LEN) -#define SEND_FD_CAPA 6 +#define SEND_FD_CAPA 10 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int)) union my_cmsg { struct cmsghdr hdr;
Instead of forcing callers to set a variable to write into, we'll just rely on wantarray. --- lib/PublicInbox/Git.pm | 9 ++++----- t/git.t | 8 ++++---- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 3d97300c..c6c1c802 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -240,17 +240,16 @@ sub batch_prepare ($) { } sub _cat_file_cb { - my ($bref, undef, undef, $size, $result) = @_; - @$result = ($bref, $size); + my ($bref, $oid, $type, $size, $result) = @_; + @$result = ($bref, $oid, $type, $size); } sub cat_file { - my ($self, $oid, $sizeref) = @_; + my ($self, $oid) = @_; my $result = []; cat_async($self, $oid, \&_cat_file_cb, $result); cat_async_wait($self); - $$sizeref = $result->[1] if $sizeref; - $result->[0]; + wantarray ? @$result : $result->[0]; } sub check_async_step ($$) { diff --git a/t/git.t b/t/git.t index 377652ca..0c85e492 100644 --- a/t/git.t +++ b/t/git.t @@ -70,10 +70,10 @@ if (1) { chomp $buf; my $gcf = PublicInbox::Git->new($dir); - my $rsize; - my $x = $gcf->cat_file($buf, \$rsize); - is($rsize, $size, 'got correct size ref on big file'); - is(length($$x), $size, 'read correct number of bytes'); + my @x = $gcf->cat_file($buf); + is($x[2], 'blob', 'got blob on wantarray'); + is($x[3], $size, 'got correct size ref on big file'); + is(length(${$x[0]}), $size, 'read correct number of bytes'); my $ref = $gcf->qx(qw(cat-file blob), $buf); is($?, 0, 'no error on scalar success');
It saves us a line of code --- lib/PublicInbox/IPC.pm | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index ece0e8b8..d2ff038d 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -119,13 +119,12 @@ sub ipc_worker_spawn { PublicInbox::DS::sig_setmask($sigset); # ensure we properly exit even if warn() dies: my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); - my $on_destroy = $self->ipc_atfork_child; eval { + my $on_destroy = $self->ipc_atfork_child; local %SIG = %SIG; ipc_worker_loop($self, $r_req, $w_res); }; die "worker $ident PID:$$ died: $@\n" if $@; - undef $on_destroy; undef $end; # trigger exit } PublicInbox::DS::sig_setmask($sigset) unless $oldset; @@ -336,13 +335,12 @@ sub _wq_worker_start ($$$) { PublicInbox::DS::sig_setmask($oldset); # ensure we properly exit even if warn() dies: my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); - my $on_destroy = $self->ipc_atfork_child; eval { + my $on_destroy = $self->ipc_atfork_child; local %SIG = %SIG; wq_worker_loop($self); }; warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@; - undef $on_destroy; undef $end; # trigger exit } else { $self->{-wq_workers}->{$pid} = \undef;
We don't need another hash slot when we can encode the object ID and PID owner into the field name itself. --- lib/PublicInbox/SharedKV.pm | 8 +++----- t/shared_kv.t | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm index 072c94ca..94f2429f 100644 --- a/lib/PublicInbox/SharedKV.pm +++ b/lib/PublicInbox/SharedKV.pm @@ -44,10 +44,7 @@ CREATE TABLE IF NOT EXISTS kv ( sub new { my ($cls, $dir, $base, $opt) = @_; my $self = bless { opt => $opt }, $cls; - unless (defined $dir) { - $self->{tmpdir} = $dir = tempdir('skv-XXXXXX', TMPDIR => 1); - $self->{tmpid} = "$$.$self"; - } + $dir //= $self->{"tmp$$.$self"} = tempdir("skv.$$-XXXX", TMPDIR => 1); -d $dir or mkdir($dir) or die "mkdir($dir): $!"; $base //= ''; my $f = $self->{filename} = "$dir/$base.sqlite3"; @@ -148,7 +145,8 @@ SELECT COUNT(k) FROM kv sub DESTROY { my ($self) = @_; - rmtree($self->{tmpdir}) if ($self->{tmpid} // '') eq "$$.$self"; + my $dir = delete $self->{"tmp$$.$self"} or return; + rmtree($dir); } 1; diff --git a/t/shared_kv.t b/t/shared_kv.t index 6f6374f2..fcae688a 100644 --- a/t/shared_kv.t +++ b/t/shared_kv.t @@ -9,7 +9,7 @@ use_ok 'PublicInbox::SharedKV'; my ($tmpdir, $for_destroy) = tmpdir(); local $ENV{TMPDIR} = $tmpdir; my $skv = PublicInbox::SharedKV->new; -my $skv_tmpdir = $skv->{tmpdir}; +my $skv_tmpdir = $skv->{"tmp$$.$skv"}; ok(-d $skv_tmpdir, 'created a temporary dir'); $skv->dbh; my $dead = "\xde\xad";