* [PATCH 01/12] lei_overview: rename {relevance} => {pct}
2021-01-21 19:46 [PATCH 00/12] lei: another dump Eric Wong
@ 2021-01-21 19:46 ` Eric Wong
2021-01-21 19:46 ` [PATCH 02/12] lei q: retrieve keywords for local, non-external messages Eric Wong
` (10 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2021-01-21 19:46 UTC (permalink / raw)
To: meta
The old name was too long compared to the rest of the field
names. With the Xapian method being named ->get_percent,
"pct" is a well known abbreviation for "percent" and already
used internally by our wrapper.
..And cleanup some excess whitespace while we're in the area.
---
lib/PublicInbox/LeiOverview.pm | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index cab2b055..8799f1cc 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -135,11 +135,9 @@ sub _unbless_smsg {
delete @$smsg{qw(lines bytes num tid)};
$smsg->{rt} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt
$smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate
- $smsg->{relevance} = get_pct($mitem) if $mitem;
-
+ $smsg->{pct} = get_pct($mitem) if $mitem;
if (my $r = delete $smsg->{references}) {
- $smsg->{refs} = [
- map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ];
+ $smsg->{refs} = [ map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ];
}
if (my $m = delete($smsg->{mid})) {
$smsg->{'m'} = "<$m>";
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH 02/12] lei q: retrieve keywords for local, non-external messages
2021-01-21 19:46 [PATCH 00/12] lei: another dump Eric Wong
2021-01-21 19:46 ` [PATCH 01/12] lei_overview: rename {relevance} => {pct} Eric Wong
@ 2021-01-21 19:46 ` Eric Wong
2021-01-21 19:46 ` [PATCH 03/12] lei_xsearch: eliminate some unused, commented-out code Eric Wong
` (9 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2021-01-21 19:46 UTC (permalink / raw)
To: meta
This isn't tested for now, so maybe it works.
---
lib/PublicInbox/LeiOverview.pm | 8 +++-----
lib/PublicInbox/LeiSearch.pm | 16 +++-------------
lib/PublicInbox/LeiXSearch.pm | 14 ++++++++++----
lib/PublicInbox/Search.pm | 20 +++++++++++++++++++-
4 files changed, 35 insertions(+), 23 deletions(-)
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 8799f1cc..47d9eb31 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -224,9 +224,8 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
my $git_dir = $git->{git_dir};
sub {
my ($smsg, $mitem) = @_;
- my $kw = []; # TODO get from mitem
$l2m->wq_do('write_mail', \@io, $git_dir,
- $smsg->{blob}, $lei_ipc, $kw)
+ $smsg->{blob}, $lei_ipc, $smsg->{kw});
}
} elsif ($l2m) {
my $wcb = $l2m->write_cb($lei);
@@ -235,8 +234,8 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
my $g2m = $l2m->can('git_to_mail');
sub {
my ($smsg, $mitem) = @_;
- my $kw = []; # TODO get from mitem
- $git->cat_async($smsg->{blob}, $g2m, [ $wcb, $kw ]);
+ $git->cat_async($smsg->{blob}, $g2m,
+ [ $wcb, $smsg->{kw} ]);
};
} elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
@@ -266,7 +265,6 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
$lei->{ovv_buf} = \(my $buf = '');
sub {
my ($smsg, $mitem) = @_;
- delete @$smsg{qw(tid num)};
$buf .= $json->encode(_unbless_smsg(@_)) . $ORS;
if (length($buf) > 65536) {
my $lk = $self->lock_for_scope;
diff --git a/lib/PublicInbox/LeiSearch.pm b/lib/PublicInbox/LeiSearch.pm
index b7e337de..440bacf5 100644
--- a/lib/PublicInbox/LeiSearch.pm
+++ b/lib/PublicInbox/LeiSearch.pm
@@ -5,7 +5,7 @@ package PublicInbox::LeiSearch;
use strict;
use v5.10.1;
use parent qw(PublicInbox::ExtSearch);
-use PublicInbox::Search;
+use PublicInbox::Search qw(xap_terms);
# get combined docid from over.num:
# (not generic Xapian, only works with our sharding scheme)
@@ -19,19 +19,9 @@ sub msg_keywords {
my ($self, $num) = @_; # num_or_mitem
my $xdb = $self->xdb; # set {nshard};
my $docid = ref($num) ? $num->get_docid : num2docid($self, $num);
- my %kw;
- eval {
- my $end = $xdb->termlist_end($docid);
- my $cur = $xdb->termlist_begin($docid);
- for (; $cur != $end; $cur++) {
- $cur->skip_to('K');
- last if $cur == $end;
- my $kw = $cur->get_termname;
- $kw =~ s/\AK//s and $kw{$kw} = undef;
- }
- };
+ my $kw = xap_terms('K', $xdb, $docid);
warn "E: #$docid ($num): $@\n" if $@;
- wantarray ? sort(keys(%kw)) : \%kw;
+ wantarray ? sort(keys(%$kw)) : $kw;
}
1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index a6d827de..d7688ede 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -13,6 +13,7 @@ use PublicInbox::OpPipe;
use PublicInbox::Import;
use File::Temp 0.19 (); # 0.19 for ->newdir
use File::Spec ();
+use PublicInbox::Search qw(xap_terms);
sub new {
my ($class) = @_;
@@ -74,7 +75,12 @@ sub smsg_for {
my $docid = $mitem->get_docid;
my $shard = ($docid - 1) % $nshard;
my $num = int(($docid - 1) / $nshard) + 1;
- my $smsg = $self->{shard2ibx}->[$shard]->over->get_art($num);
+ my $ibx = $self->{shard2ibx}->[$shard];
+ my $smsg = $ibx->over->get_art($num);
+ if (ref($ibx->can('msg_keywords'))) {
+ my $kw = xap_terms('K', $mitem->get_document);
+ $smsg->{kw} = [ sort keys %$kw ];
+ }
$smsg->{docid} = $docid;
$smsg;
}
@@ -153,11 +159,11 @@ sub query_mset { # non-parallel for non-"--thread" users
$dedupe->prepare_dedupe;
do {
$mset = $self->mset($mo->{qstr}, $mo);
- for my $it ($mset->items) {
- my $smsg = smsg_for($self, $it) or next;
+ for my $mitem ($mset->items) {
+ my $smsg = smsg_for($self, $mitem) or next;
wait_startq($startq) if $startq;
next if $dedupe->is_smsg_dup($smsg);
- $each_smsg->($smsg, $it);
+ $each_smsg->($smsg, $mitem);
}
} while (_mset_more($mset, $mo));
undef $each_smsg; # drops @io for l2m->{each_smsg_done}
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index a4b40f94..7c6a16be 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -6,7 +6,7 @@
package PublicInbox::Search;
use strict;
use parent qw(Exporter);
-our @EXPORT_OK = qw(retry_reopen int_val get_pct);
+our @EXPORT_OK = qw(retry_reopen int_val get_pct xap_terms);
use List::Util qw(max);
# values for searching, changing the numeric value breaks
@@ -432,4 +432,22 @@ sub get_pct ($) { # mset item
$n > 99 ? 99 : $n;
}
+sub xap_terms ($$;@) {
+ my ($pfx, $xdb_or_doc, @docid) = @_; # @docid may be empty ()
+ my %ret;
+ eval {
+ my $end = $xdb_or_doc->termlist_end(@docid);
+ my $cur = $xdb_or_doc->termlist_begin(@docid);
+ for (; $cur != $end; $cur++) {
+ $cur->skip_to($pfx);
+ last if $cur == $end;
+ my $tn = $cur->get_termname;
+ if (index($tn, $pfx) == 0) {
+ $ret{substr($tn, length($pfx))} = undef;
+ }
+ }
+ };
+ \%ret;
+}
+
1;
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH 03/12] lei_xsearch: eliminate some unused, commented-out code
2021-01-21 19:46 [PATCH 00/12] lei: another dump Eric Wong
2021-01-21 19:46 ` [PATCH 01/12] lei_overview: rename {relevance} => {pct} Eric Wong
2021-01-21 19:46 ` [PATCH 02/12] lei q: retrieve keywords for local, non-external messages Eric Wong
@ 2021-01-21 19:46 ` Eric Wong
2021-01-21 19:46 ` [PATCH 04/12] lei: show {pct} and {oid} in From_ lines and filenames Eric Wong
` (8 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2021-01-21 19:46 UTC (permalink / raw)
To: meta
---
lib/PublicInbox/LeiXSearch.pm | 5 -----
1 file changed, 5 deletions(-)
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index d7688ede..13611882 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -291,11 +291,6 @@ sub do_query {
my @pids = $self->wq_close;
# for the $lei->atfork_child_wq PIPE handler:
$done_op->{'!'}->[3] = \@pids;
- # $done->event_step;
- # my $ipc_worker_reap = $self->can('ipc_worker_reap');
- # if (my $l2m_pids = delete $self->{l2m_pids}) {
- # dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids;
- # }
while ($done->{sock}) { $done->event_step }
my $ipc_worker_reap = $self->can('ipc_worker_reap');
dwaitpid($_, $ipc_worker_reap, $self) for @pids;
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH 04/12] lei: show {pct} and {oid} in From_ lines and filenames
2021-01-21 19:46 [PATCH 00/12] lei: another dump Eric Wong
` (2 preceding siblings ...)
2021-01-21 19:46 ` [PATCH 03/12] lei_xsearch: eliminate some unused, commented-out code Eric Wong
@ 2021-01-21 19:46 ` Eric Wong
2021-01-21 19:46 ` [PATCH 05/12] lei: fix inadvertant FD sharing Eric Wong
` (7 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2021-01-21 19:46 UTC (permalink / raw)
To: meta
From_ lines are shown when mbox* variants are output to stdout,
making {oid} and {pct} information visible without risking being
propagated to other importer processes if they were in
lei-specific X-* headers.
Maildirs already had OIDs in the filename, now they gain Xapian
{pct} in case anybody cares.
---
lib/PublicInbox/LeiOverview.pm | 9 ++---
lib/PublicInbox/LeiToMail.pm | 60 +++++++++++++++++++---------------
t/lei_to_mail.t | 41 +++++++++++++----------
3 files changed, 61 insertions(+), 49 deletions(-)
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 47d9eb31..7a4fa857 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -224,8 +224,9 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
my $git_dir = $git->{git_dir};
sub {
my ($smsg, $mitem) = @_;
- $l2m->wq_do('write_mail', \@io, $git_dir,
- $smsg->{blob}, $lei_ipc, $smsg->{kw});
+ $smsg->{pct} = get_pct($mitem) if $mitem;
+ $l2m->wq_do('write_mail', \@io, $git_dir, $smsg,
+ $lei_ipc);
}
} elsif ($l2m) {
my $wcb = $l2m->write_cb($lei);
@@ -234,8 +235,8 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
my $g2m = $l2m->can('git_to_mail');
sub {
my ($smsg, $mitem) = @_;
- $git->cat_async($smsg->{blob}, $g2m,
- [ $wcb, $smsg->{kw} ]);
+ $smsg->{pct} = get_pct($mitem) if $mitem;
+ $git->cat_async($smsg->{blob}, $g2m, [ $wcb, $smsg ]);
};
} elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 1be0b09c..3dcce9e7 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -32,14 +32,14 @@ my %kw2status = (
);
sub _mbox_hdr_buf ($$$) {
- my ($eml, $type, $kw) = @_;
+ my ($eml, $type, $smsg) = @_;
$eml->header_set($_) for (qw(Lines Bytes Content-Length));
# Messages are always 'O' (non-\Recent in IMAP), it saves
# MUAs the trouble of rewriting the mbox if no other
# changes are made
my %hdr = (Status => [ 'O' ]); # set Status, X-Status
- for my $k (@$kw) {
+ for my $k (@{$smsg->{kw} // []}) {
if (my $ent = $kw2status{$k}) {
push @{$hdr{$ent->[0]}}, $ent->[1];
} else { # X-Label?
@@ -53,9 +53,11 @@ sub _mbox_hdr_buf ($$$) {
# fixup old bug from import (pre-a0c07cba0e5d8b6a)
$$buf =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+ my $ident = $smsg->{blob} // 'lei';
+ if (defined(my $pct = $smsg->{pct})) { $ident .= "=$pct" }
substr($$buf, 0, 0, # prepend From line
- "From lei\@$type Thu Jan 1 00:00:00 1970$eml->{crlf}");
+ "From $ident\@$type Thu Jan 1 00:00:00 1970$eml->{crlf}");
$buf;
}
@@ -71,8 +73,8 @@ sub _print_full {
}
sub eml2mboxrd ($;$) {
- my ($eml, $kw) = @_;
- my $buf = _mbox_hdr_buf($eml, 'mboxrd', $kw);
+ my ($eml, $smsg) = @_;
+ my $buf = _mbox_hdr_buf($eml, 'mboxrd', $smsg);
if (my $bdy = delete $eml->{bdy}) {
$$bdy =~ s/^(>*From )/>$1/gm;
$$buf .= $eml->{crlf};
@@ -84,8 +86,8 @@ sub eml2mboxrd ($;$) {
}
sub eml2mboxo {
- my ($eml, $kw) = @_;
- my $buf = _mbox_hdr_buf($eml, 'mboxo', $kw);
+ my ($eml, $smsg) = @_;
+ my $buf = _mbox_hdr_buf($eml, 'mboxo', $smsg);
if (my $bdy = delete $eml->{bdy}) {
$$bdy =~ s/^From />From /gm;
$$buf .= $eml->{crlf};
@@ -108,8 +110,8 @@ sub _mboxcl_common ($$$) {
# mboxcl still escapes "From " lines
sub eml2mboxcl {
- my ($eml, $kw) = @_;
- my $buf = _mbox_hdr_buf($eml, 'mboxcl', $kw);
+ my ($eml, $smsg) = @_;
+ my $buf = _mbox_hdr_buf($eml, 'mboxcl', $smsg);
my $crlf = $eml->{crlf};
if (my $bdy = delete $eml->{bdy}) {
$$bdy =~ s/^From />From /gm;
@@ -121,8 +123,8 @@ sub eml2mboxcl {
# mboxcl2 has no "From " escaping
sub eml2mboxcl2 {
- my ($eml, $kw) = @_;
- my $buf = _mbox_hdr_buf($eml, 'mboxcl2', $kw);
+ my ($eml, $smsg) = @_;
+ my $buf = _mbox_hdr_buf($eml, 'mboxcl2', $smsg);
my $crlf = $eml->{crlf};
if (my $bdy = delete $eml->{bdy}) {
_mboxcl_common($buf, $bdy, $crlf);
@@ -140,10 +142,11 @@ sub git_to_mail { # git->cat_async callback
warn "unexpected type=$type for $oid\n";
}
}
- if ($size > 0) {
- my ($write_cb, $kw) = @$arg;
- $write_cb->($bref, $oid, $kw);
+ my ($write_cb, $smsg) = @$arg;
+ if ($smsg->{blob} ne $oid) {
+ die "BUG: expected=$smsg->{blob} got=$oid";
}
+ $write_cb->($bref, $smsg) if $size > 0;
}
sub reap_compress { # dwaitpid callback
@@ -247,11 +250,11 @@ sub _mbox_write_cb ($$) {
my $dedupe = $lei->{dedupe};
$dedupe->prepare_dedupe;
sub { # for git_to_mail
- my ($buf, $oid, $kw) = @_;
+ my ($buf, $smsg) = @_;
return unless $out;
my $eml = PublicInbox::Eml->new($buf);
- if (!$dedupe->is_dup($eml, $oid)) {
- $buf = $eml2mbox->($eml, $kw);
+ if (!$dedupe->is_dup($eml, $smsg->{blob})) {
+ $buf = $eml2mbox->($eml, $smsg);
my $lk = $ovv->lock_for_scope;
eval { $write->($out, $buf) };
if ($@) {
@@ -283,12 +286,15 @@ sub _augment_file { # _maildir_each_file cb
sub _unlink { unlink($_[0]) }
sub _buf2maildir {
- my ($dst, $buf, $oid, $kw) = @_;
+ my ($dst, $buf, $smsg) = @_;
+ my $kw = $smsg->{kw} // [];
my $sfx = join('', sort(map { $kw2char{$_} // () } @$kw));
my $rand = ''; # chosen by die roll :P
my ($tmp, $fh, $final);
+ my $common = $smsg->{blob};
+ if (defined(my $pct = $smsg->{pct})) { $common .= "=$pct" }
do {
- $tmp = $dst.'tmp/'.$rand."oid=$oid";
+ $tmp = $dst.'tmp/'.$rand.$common;
} while (!sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY) &&
$! == EEXIST && ($rand = int(rand 0x7fffffff).','));
if (print $fh $$buf and close($fh)) {
@@ -299,14 +305,14 @@ sub _buf2maildir {
$dst .= 'cur/';
$rand = '';
do {
- $final = $dst.$rand."oid=$oid:2,$sfx";
+ $final = $dst.$rand.$common.':2,'.$sfx;
} while (!link($tmp, $final) && $! == EEXIST &&
($rand = int(rand 0x7fffffff).','));
unlink($tmp) or warn "W: failed to unlink $tmp: $!\n";
} else {
my $err = $!;
unlink($tmp);
- die "Error writing $oid to $dst: $err";
+ die "Error writing $smsg->{blob} to $dst: $err";
}
}
@@ -316,12 +322,12 @@ sub _maildir_write_cb ($$) {
$dedupe->prepare_dedupe;
my $dst = $lei->{ovv}->{dst};
sub { # for git_to_mail
- my ($buf, $oid, $kw) = @_;
- return _buf2maildir($dst, $buf, $oid, $kw) if !$dedupe;
+ my ($buf, $smsg) = @_;
+ return _buf2maildir($dst, $buf, $smsg) if !$dedupe;
my $eml = PublicInbox::Eml->new($$buf); # copy buf
- return if $dedupe->is_dup($eml, $oid);
+ return if $dedupe->is_dup($eml, $smsg->{blob});
undef $eml;
- _buf2maildir($dst, $buf, $oid, $kw);
+ _buf2maildir($dst, $buf, $smsg);
}
}
@@ -447,7 +453,7 @@ sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon
}
sub write_mail { # via ->wq_do
- my ($self, $git_dir, $oid, $lei, $kw) = @_;
+ my ($self, $git_dir, $smsg, $lei) = @_;
my $not_done = delete $self->{4}; # write end of {each_smsg_done}
my $wcb = $self->{wcb} //= do { # first message
my %sig = $lei->atfork_child_wq($self);
@@ -456,7 +462,7 @@ sub write_mail { # via ->wq_do
$self->write_cb($lei);
};
my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
- $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw, $not_done ]);
+ $git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]);
}
sub ipc_atfork_prepare {
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index 6673d9a6..47c0e3d4 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -18,11 +18,12 @@ my $noeol = "Subject: x\n\nFrom hell";
my $crlf = $noeol;
$crlf =~ s/\n/\r\n/g;
my $kw = [qw(seen answered flagged)];
+my $smsg = { kw => $kw, blob => '0'x40 };
my @MBOX = qw(mboxcl2 mboxrd mboxcl mboxo);
for my $mbox (@MBOX) {
my $m = "eml2$mbox";
my $cb = PublicInbox::LeiToMail->can($m);
- my $s = $cb->(PublicInbox::Eml->new($from), $kw);
+ my $s = $cb->(PublicInbox::Eml->new($from), $smsg);
is(substr($$s, -1, 1), "\n", "trailing LF in normal $mbox");
my $eml = PublicInbox::Eml->new($s);
is($eml->header('Status'), 'OR', "Status: set by $m");
@@ -40,7 +41,7 @@ for my $mbox (@MBOX) {
} else {
is(scalar(@cl), 0, "$m clobbered Content-Length");
}
- $s = $cb->(PublicInbox::Eml->new($noeol), $kw);
+ $s = $cb->(PublicInbox::Eml->new($noeol), $smsg);
is(substr($$s, -1, 1), "\n",
"trailing LF added by $m when original lacks EOL");
$eml = PublicInbox::Eml->new($s);
@@ -49,7 +50,7 @@ for my $mbox (@MBOX) {
} else {
is($eml->body_raw, ">From hell\n", "From escaped once by $m");
}
- $s = $cb->(PublicInbox::Eml->new($crlf), $kw);
+ $s = $cb->(PublicInbox::Eml->new($crlf), $smsg);
is(substr($$s, -2, 2), "\r\n",
"trailing CRLF added $m by original lacks EOL");
$eml = PublicInbox::Eml->new($s);
@@ -62,7 +63,7 @@ for my $mbox (@MBOX) {
is($eml->header('Content-Length') + length("\r\n"),
length($eml->body_raw), "$m Content-Length matches");
} elsif ($mbox eq 'mboxrd') {
- $s = $cb->($eml, $kw);
+ $s = $cb->($eml, $smsg);
$eml = PublicInbox::Eml->new($s);
is($eml->body_raw,
">>From hell\r\n\r\n", "From escaped again by $m");
@@ -102,11 +103,12 @@ my $wcb_get = sub {
$cb;
};
+my $deadbeef = { blob => 'deadbeef', kw => [ qw(seen) ] };
my $orig = do {
my $wcb = $wcb_get->($mbox, $fn);
is(ref $wcb, 'CODE', 'write_cb returned callback');
ok(-f $fn && !-s _, 'empty file created');
- $wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]);
+ $wcb->(\(my $dup = $buf), $deadbeef);
undef $wcb;
open my $fh, '<', $fn or BAIL_OUT $!;
my $raw = do { local $/; <$fh> };
@@ -116,7 +118,7 @@ my $orig = do {
local $lei->{opt} = { jobs => 2 };
$wcb = $wcb_get->($mbox, $fn);
ok(-f $fn && !-s _, 'truncated mbox destination');
- $wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]);
+ $wcb->(\($dup = $buf), $deadbeef);
undef $wcb;
open $fh, '<', $fn or BAIL_OUT $!;
is(do { local $/; <$fh> }, $raw, 'jobs > 1');
@@ -131,7 +133,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
ok($dc_cmd, "decompressor for .$zsfx");
my $f = "$fn.$zsfx";
my $wcb = $wcb_get->($mbox, $f);
- $wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]);
+ $wcb->(\(my $dup = $buf), $deadbeef);
undef $wcb;
my $uncompressed = xqx([@$dc_cmd, $f]);
is($uncompressed, $orig, "$zsfx works unlocked");
@@ -139,13 +141,13 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
local $lei->{opt} = { jobs => 2 }; # for atomic writes
unlink $f or BAIL_OUT "unlink $!";
$wcb = $wcb_get->($mbox, $f);
- $wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]);
+ $wcb->(\($dup = $buf), $deadbeef);
undef $wcb;
is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock");
local $lei->{opt} = { augment => 1 };
$wcb = $wcb_get->($mbox, $f);
- $wcb->(\($dup = $buf . "\nx\n"), 'deadbeef', [ qw(seen) ]);
+ $wcb->(\($dup = $buf . "\nx\n"), $deadbeef);
undef $wcb; # commit
my $cat = popen_rd([@$dc_cmd, $f]);
@@ -157,7 +159,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
local $lei->{opt} = { augment => 1, jobs => 2 };
$wcb = $wcb_get->($mbox, $f);
- $wcb->(\($dup = $buf . "\ny\n"), 'deadbeef', [ qw(seen) ]);
+ $wcb->(\($dup = $buf . "\ny\n"), $deadbeef);
undef $wcb; # commit
my @raw3;
@@ -179,7 +181,8 @@ my $as_orig = sub {
unlink $fn or BAIL_OUT $!;
if ('default deduplication uses content_hash') {
my $wcb = $wcb_get->('mboxo', $fn);
- $wcb->(\(my $x = $buf), 'deadbeef', []) for (1..2);
+ $deadbeef->{kw} = [];
+ $wcb->(\(my $x = $buf), $deadbeef) for (1..2);
undef $wcb; # undef to commit changes
my $cmp = '';
open my $fh, '<', $fn or BAIL_OUT $!;
@@ -188,7 +191,7 @@ if ('default deduplication uses content_hash') {
local $lei->{opt} = { augment => 1 };
$wcb = $wcb_get->('mboxo', $fn);
- $wcb->(\($x = $buf . "\nx\n"), 'deadbeef', []) for (1..2);
+ $wcb->(\($x = $buf . "\nx\n"), $deadbeef) for (1..2);
undef $wcb; # undef to commit changes
open $fh, '<', $fn or BAIL_OUT $!;
my @x;
@@ -202,7 +205,7 @@ if ('default deduplication uses content_hash') {
open my $tmp, '+>', undef or BAIL_OUT $!;
local $lei->{1} = $tmp;
my $wcb = $wcb_get->('mboxrd', '/dev/stdout');
- $wcb->(\(my $x = $buf), 'deadbeef', []);
+ $wcb->(\(my $x = $buf), $deadbeef);
undef $wcb; # commit
seek($tmp, 0, SEEK_SET) or BAIL_OUT $!;
my $cmp = '';
@@ -216,7 +219,7 @@ SKIP: { # FIFO support
mkfifo($fn, 0600) or skip("mkfifo not supported: $!", 1);
my $cat = popen_rd([which('cat'), $fn]);
my $wcb = $wcb_get->('mboxo', $fn);
- $wcb->(\(my $x = $buf), 'deadbeef', []);
+ $wcb->(\(my $x = $buf), $deadbeef);
undef $wcb; # commit
my $cmp = '';
PublicInbox::MboxReader->mboxo($cat, sub { $cmp .= $as_orig->(@_) });
@@ -227,7 +230,8 @@ SKIP: { # FIFO support
my $md = "$tmpdir/maildir/";
my $wcb = $wcb_get->('maildir', $md);
is(ref($wcb), 'CODE', 'got Maildir callback');
- $wcb->(\(my $x = $buf), 'badc0ffee', []);
+ my $b4dc0ffee = { blob => 'badc0ffee', kw => [] };
+ $wcb->(\(my $x = $buf), $b4dc0ffee);
my @f;
PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @f, shift });
@@ -235,7 +239,8 @@ SKIP: { # FIFO support
is(do { local $/; <$fh> }, $buf, 'wrote to Maildir');
$wcb = $wcb_get->('maildir', $md);
- $wcb->(\($x = $buf."\nx\n"), 'deadcafe', []);
+ my $deadcafe = { blob => 'deadcafe', kw => [] };
+ $wcb->(\($x = $buf."\nx\n"), $deadcafe);
my @x = ();
PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @x, shift });
@@ -246,8 +251,8 @@ SKIP: { # FIFO support
local $lei->{opt}->{augment} = 1;
$wcb = $wcb_get->('maildir', $md);
- $wcb->(\($x = $buf."\ny\n"), 'deadcafe', []);
- $wcb->(\($x = $buf."\ny\n"), 'b4dc0ffee', []); # skipped by dedupe
+ $wcb->(\($x = $buf."\ny\n"), $deadcafe);
+ $wcb->(\($x = $buf."\ny\n"), $b4dc0ffee); # skipped by dedupe
@f = ();
PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @f, shift });
is(scalar grep(/\A\Q$x[0]\E\z/, @f), 1, 'old file still there');
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH 05/12] lei: fix inadvertant FD sharing
2021-01-21 19:46 [PATCH 00/12] lei: another dump Eric Wong
` (3 preceding siblings ...)
2021-01-21 19:46 ` [PATCH 04/12] lei: show {pct} and {oid} in From_ lines and filenames Eric Wong
@ 2021-01-21 19:46 ` Eric Wong
2021-01-21 19:46 ` [PATCH 06/12] lei_to_mail: avoid segfault on exit Eric Wong
` (6 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2021-01-21 19:46 UTC (permalink / raw)
To: meta
$wq->{-ipc_atfork_child_close} neededed to be initialized properly.
And start setting $0 in workers to improve visibility.
---
lib/PublicInbox/IPC.pm | 22 ++++++++++++++++++----
lib/PublicInbox/LEI.pm | 9 +++++----
lib/PublicInbox/LeiQuery.pm | 21 +++++++++++----------
lib/PublicInbox/LeiToMail.pm | 2 +-
lib/PublicInbox/LeiXSearch.pm | 27 ++++++++++++---------------
5 files changed, 47 insertions(+), 34 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 8fec2e62..24f45e03 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -134,6 +134,12 @@ sub ipc_worker_reap { # dwaitpid callback
warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15;
}
+sub wq_wait_old {
+ my ($self) = @_;
+ my $pids = delete $self->{"-wq_old_pids.$$"} or return;
+ dwaitpid($_, \&ipc_worker_reap, $self) for @$pids;
+}
+
# for base class, override in sub classes
sub ipc_atfork_prepare {}
@@ -370,17 +376,25 @@ sub wq_workers {
}
sub wq_close {
- my ($self) = @_;
+ my ($self, $nohang) = @_;
delete @$self{qw(-wq_s1 -wq_s2)} or return;
my $ppid = delete $self->{-wq_ppid} or return;
my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
return if $ppid != $$; # can't reap siblings or parents
- return (keys %$workers) if wantarray; # caller will reap
- for my $pid (keys %$workers) {
- dwaitpid($pid, \&ipc_worker_reap, $self);
+ my @pids = map { $_ + 0 } keys %$workers;
+ if ($nohang) {
+ push @{$self->{"-wq_old_pids.$$"}}, @pids;
+ } else {
+ dwaitpid($_, \&ipc_worker_reap, $self) for @pids;
}
}
+sub wq_kill_old {
+ my ($self) = @_;
+ my $pids = $self->{"-wq_old_pids.$$"} or return;
+ kill 'TERM', @$pids;
+}
+
sub wq_kill {
my ($self, $sig) = @_;
my $workers = $self->{-wq_workers} or return;
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 6be6d10b..2cb2bf40 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -281,11 +281,14 @@ sub fail ($$;$) {
sub atfork_prepare_wq {
my ($self, $wq) = @_;
- my $tcafc = $wq->{-ipc_atfork_child_close};
+ my $tcafc = $wq->{-ipc_atfork_child_close} //= [];
push @$tcafc, @TO_CLOSE_ATFORK_CHILD;
if (my $sock = $self->{sock}) {
push @$tcafc, @$self{qw(0 1 2)}, $sock;
}
+ if (my $pgr = $self->{pgr}) {
+ push @$tcafc, @$pgr[1,2];
+ }
for my $f (qw(lxs l2m)) {
my $ipc = $self->{$f} or next;
push @$tcafc, grep { defined }
@@ -335,9 +338,7 @@ sub atfork_parent_wq {
my $l2m = $ret->{l2m};
if ($l2m && $l2m != $wq) { # $wq == lxs
$io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1};
- if (my @pids = $l2m->wq_close) {
- $wq->{l2m_pids} = \@pids;
- }
+ $l2m->wq_close(1);
}
($ret, @io);
}
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 941bc299..7d634b5e 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -32,24 +32,25 @@ sub lei_q {
my $sto = $self->_lei_store(1);
push @srcs, $sto->search;
}
- my $lxs = PublicInbox::LeiXSearch->new;
+ my $lxs = $self->{lxs} = PublicInbox::LeiXSearch->new;
# --external is enabled by default, but allow --no-external
- if ($opt->{external} // 1) {
+ if ($opt->{external} //= 1) {
$self->_externals_each(\&_vivify_external, \@srcs);
}
- my $j = $opt->{jobs} // (scalar(@srcs) > 3 ? 3 : scalar(@srcs));
- $j = 1 if !$opt->{thread};
- $self->atfork_prepare_wq($lxs);
- $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset);
- $self->{lxs} = $lxs;
-
+ my $xj = $opt->{jobs} // (scalar(@srcs) > 3 ? 3 : scalar(@srcs));
+ $xj = 1 if !$opt->{thread};
my $ovv = PublicInbox::LeiOverview->new($self) or return;
+ $self->atfork_prepare_wq($lxs);
+ $lxs->wq_workers_start('lei_xsearch', $xj, $self->oldset);
+ delete $lxs->{-ipc_atfork_child_close};
if (my $l2m = $self->{l2m}) {
- $j = 4 if $j <= 4; # TODO configurable
+ my $mj = 4; # TODO: configurable
$self->atfork_prepare_wq($l2m);
- $l2m->wq_workers_start('lei2mail', $j, $self->oldset);
+ $l2m->wq_workers_start('lei2mail', $mj, $self->oldset);
+ delete $l2m->{-ipc_atfork_child_close};
}
+
# no forking workers after this
my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 3dcce9e7..87cc9c47 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -467,7 +467,7 @@ sub write_mail { # via ->wq_do
sub ipc_atfork_prepare {
my ($self) = @_;
- # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr)
+ # (done_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr)
$self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= >&=]);
$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
}
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 13611882..7b33677e 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -110,6 +110,7 @@ sub wait_startq ($) {
sub query_thread_mset { # for --thread
my ($self, $lei, $ibxish) = @_;
+ local $0 = "$0 query_thread_mset";
my $startq = delete $self->{5};
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
@@ -148,6 +149,7 @@ sub query_thread_mset { # for --thread
sub query_mset { # non-parallel for non-"--thread" users
my ($self, $lei, $srcs) = @_;
+ local $0 = "$0 query_mset";
my $startq = delete $self->{5};
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
@@ -192,12 +194,10 @@ sub git {
sub query_done { # EOF callback
my ($self, $lei) = @_;
my $l2m = delete $lei->{l2m};
- if (my $pids = delete $self->{l2m_pids}) {
- my $ipc_worker_reap = $self->can('ipc_worker_reap');
- dwaitpid($_, $ipc_worker_reap, $l2m) for @$pids;
- }
+ $l2m->wq_wait_old if $l2m;
+ $self->wq_wait_old;
$lei->{ovv}->ovv_end($lei);
- if ($l2m) { # calls LeiToMail reap_compress
+ if ($l2m) { # close() calls LeiToMail reap_compress
close(delete($lei->{1})) if $lei->{1};
$lei->start_mua;
}
@@ -232,12 +232,12 @@ sub start_query { # always runs in main (lei-daemon) process
for my $rmt (@$remotes) {
$self->wq_do('query_thread_mbox', $io, $lei, $rmt);
}
- close $io->[0]; # qry_status_wr
@$io = ();
}
sub query_prepare { # called by wq_do
my ($self, $lei) = @_;
+ local $0 = "$0 query_prepare";
my %sig = $lei->atfork_child_wq($self);
-p $lei->{0} or die "BUG: \$done pipe expected";
local @SIG{keys %sig} = values %sig;
@@ -246,11 +246,11 @@ sub query_prepare { # called by wq_do
syswrite($lei->{0}, '.') == 1 or die "do_post_augment trigger: $!";
}
-sub sigpipe_handler {
- my ($self, $lei_orig, $pids) = @_;
- if ($pids) { # one-shot (no event loop)
- kill 'TERM', @$pids;
+sub sigpipe_handler { # handles SIGPIPE from wq workers
+ my ($self, $lei_orig) = @_;
+ if ($self->wq_kill_old) {
kill 'PIPE', $$;
+ $self->wq_wait_old;
} else {
$self->wq_kill;
$self->wq_close;
@@ -287,19 +287,16 @@ sub do_query {
$io[1] = $zpipe->[1] if $zpipe;
}
start_query($self, \@io, $lei, $srcs);
+ $self->wq_close(1);
unless ($in_loop) {
- my @pids = $self->wq_close;
# for the $lei->atfork_child_wq PIPE handler:
- $done_op->{'!'}->[3] = \@pids;
while ($done->{sock}) { $done->event_step }
- my $ipc_worker_reap = $self->can('ipc_worker_reap');
- dwaitpid($_, $ipc_worker_reap, $self) for @pids;
}
}
sub ipc_atfork_prepare {
my ($self) = @_;
- # (0: qry_status_wr, 1: stdout|mbox, 2: stderr,
+ # (0: done_wr, 1: stdout|mbox, 2: stderr,
# 3: sock, 4: $l2m->{-wq_s1}, 5: $startq)
$self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&= <&=]);
$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH 06/12] lei_to_mail: avoid segfault on exit
2021-01-21 19:46 [PATCH 00/12] lei: another dump Eric Wong
` (4 preceding siblings ...)
2021-01-21 19:46 ` [PATCH 05/12] lei: fix inadvertant FD sharing Eric Wong
@ 2021-01-21 19:46 ` Eric Wong
2021-01-21 19:46 ` [PATCH 07/12] lei: oneshot: use client $io[2] for placeholder Eric Wong
` (5 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2021-01-21 19:46 UTC (permalink / raw)
To: meta
Worker exit causes DESTROY ordering to become unpredictable and
leads to Perl segfaulting. Instead, rely on OnDestroy and
explicit triggering after wq_worker_loop to ensure we finish
all outstanding git requests before worker exit.
---
lib/PublicInbox/LeiToMail.pm | 18 ++++++++++++++----
1 file changed, 14 insertions(+), 4 deletions(-)
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 87cc9c47..cea68319 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -11,6 +11,7 @@ use PublicInbox::Lock;
use PublicInbox::ProcessPipe;
use PublicInbox::Spawn qw(which spawn popen_rd);
use PublicInbox::LeiDedupe;
+use PublicInbox::OnDestroy;
use Symbol qw(gensym);
use IO::Handle; # ->autoflush
use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
@@ -472,12 +473,21 @@ sub ipc_atfork_prepare {
$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
}
-sub DESTROY {
+# We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY
+# ordering is unstable at worker exit and may cause segfaults
+sub reap_gits {
my ($self) = @_;
- for my $pid_git (grep(/\A$$\0/, keys %$self)) {
- $self->{$pid_git}->async_wait_all;
+ for my $git (delete @$self{grep(/\A$$\0/, keys %$self)}) {
+ $git->async_wait_all;
}
- $self->SUPER::DESTROY; # PublicInbox::IPC
+}
+
+sub ipc_atfork_child { # runs after IPC::wq_worker_loop
+ my ($self) = @_;
+ $self->SUPER::ipc_atfork_child;
+ # reap_gits needs to run before $self->DESTROY,
+ # IPC.pm will ensure that.
+ PublicInbox::OnDestroy->new($$, \&reap_gits, $self);
}
1;
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH 07/12] lei: oneshot: use client $io[2] for placeholder
2021-01-21 19:46 [PATCH 00/12] lei: another dump Eric Wong
` (5 preceding siblings ...)
2021-01-21 19:46 ` [PATCH 06/12] lei_to_mail: avoid segfault on exit Eric Wong
@ 2021-01-21 19:46 ` Eric Wong
2021-01-21 19:46 ` [PATCH 08/12] lei: remove INT/QUIT/TERM handlers, fix daemon EOF Eric Wong
` (4 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2021-01-21 19:46 UTC (permalink / raw)
To: meta
STDERR may actually get closed in ->ipc_atfork_child in
oneshot mode, so ensure we pass in a valid file handle
to avoid warnings ->wq_do.
---
lib/PublicInbox/LEI.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 2cb2bf40..11ea385f 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -334,7 +334,7 @@ sub atfork_parent_wq {
$self->{env} = $env;
delete @$ret{qw(-lei_store cfg pgr lxs)}; # keep l2m
my @io = delete @$ret{0..2};
- $io[3] = delete($ret->{sock}) // *STDERR{GLOB};
+ $io[3] = delete($ret->{sock}) // $io[2];
my $l2m = $ret->{l2m};
if ($l2m && $l2m != $wq) { # $wq == lxs
$io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1};
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH 08/12] lei: remove INT/QUIT/TERM handlers, fix daemon EOF
2021-01-21 19:46 [PATCH 00/12] lei: another dump Eric Wong
` (6 preceding siblings ...)
2021-01-21 19:46 ` [PATCH 07/12] lei: oneshot: use client $io[2] for placeholder Eric Wong
@ 2021-01-21 19:46 ` Eric Wong
2021-01-21 19:46 ` [PATCH 09/12] lei_xsearch: reduce reference paths to lxs Eric Wong
` (3 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2021-01-21 19:46 UTC (permalink / raw)
To: meta
The signal handlers on the client side were unnecessary,
all we need is to handle socket EOF properly in the daemon
by killing xsearch and l2m workers.
---
lib/PublicInbox/IPC.pm | 1 +
lib/PublicInbox/LEI.pm | 9 ++++++++-
script/lei | 5 -----
3 files changed, 9 insertions(+), 6 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 24f45e03..dbb87e4e 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -408,6 +408,7 @@ sub DESTROY {
my $ppid = $self->{-wq_ppid};
wq_kill($self) if $ppid && $ppid == $$;
wq_close($self);
+ wq_wait_old($self);
ipc_worker_stop($self);
}
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 11ea385f..ccfc1649 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -767,7 +767,14 @@ sub accept_dispatch { # Listener {post_accept} callback
sub dclose {
my ($self) = @_;
- delete $self->{lxs}; # stops LeiXSearch queries
+ for my $f (qw(lxs l2m)) {
+ my $wq = delete $self->{$f} or next;
+ if ($wq->wq_kill) {
+ $self->wq_close
+ } elsif ($wq->wq_kill_old) {
+ $wq->wq_wait_old;
+ }
+ }
close(delete $self->{1}) if $self->{1}; # may reap_compress
$self->close if $self->{sock}; # PublicInbox::DS::close
}
diff --git a/script/lei b/script/lei
index a4a0217b..8dcea562 100755
--- a/script/lei
+++ b/script/lei
@@ -81,11 +81,6 @@ Falling back to (slow) one-shot mode
while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" }
$buf .= "\0\0";
$send_cmd->($sock, [ 0, 1, 2, fileno($dh) ], $buf, MSG_EOR);
- $SIG{TERM} = $SIG{INT} = $SIG{QUIT} = sub {
- my ($sig) = @_; # 'TERM', not an integer :<
- $SIG{$sig} = 'DEFAULT';
- kill($sig, $$); # exit($signo + 128)
- };
my $x_it_code = 0;
while (1) {
my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33);
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH 09/12] lei_xsearch: reduce reference paths to lxs
2021-01-21 19:46 [PATCH 00/12] lei: another dump Eric Wong
` (7 preceding siblings ...)
2021-01-21 19:46 ` [PATCH 08/12] lei: remove INT/QUIT/TERM handlers, fix daemon EOF Eric Wong
@ 2021-01-21 19:46 ` Eric Wong
2021-01-21 19:46 ` [PATCH 10/12] lei: remove @TO_CLOSE_ATFORK_CHILD Eric Wong
` (2 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2021-01-21 19:46 UTC (permalink / raw)
To: meta
Having an extra reference to LeiXSearch from the OpPipe $done_op
map is unnecessary and makes the reference graph more complex
than it needs to be. Just use $lei->{lxs} to simplify and
reduce the likelyhood of bugs.
---
lib/PublicInbox/LeiXSearch.pm | 30 +++++++++++++++---------------
1 file changed, 15 insertions(+), 15 deletions(-)
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 7b33677e..987a9896 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -192,12 +192,14 @@ sub git {
}
sub query_done { # EOF callback
- my ($self, $lei) = @_;
- my $l2m = delete $lei->{l2m};
- $l2m->wq_wait_old if $l2m;
- $self->wq_wait_old;
+ my ($lei) = @_;
+ my $has_l2m = exists $lei->{l2m};
+ for my $f (qw(lxs l2m)) {
+ my $wq = delete $lei->{$f} or next;
+ $wq->wq_wait_old;
+ }
$lei->{ovv}->ovv_end($lei);
- if ($l2m) { # close() calls LeiToMail reap_compress
+ if ($has_l2m) { # close() calls LeiToMail reap_compress
close(delete($lei->{1})) if $lei->{1};
$lei->start_mua;
}
@@ -246,16 +248,14 @@ sub query_prepare { # called by wq_do
syswrite($lei->{0}, '.') == 1 or die "do_post_augment trigger: $!";
}
-sub sigpipe_handler { # handles SIGPIPE from wq workers
- my ($self, $lei_orig) = @_;
- if ($self->wq_kill_old) {
+sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
+ my ($lei) = @_;
+ my $lxs = delete $lei->{lxs};
+ if ($lxs && $lxs->wq_kill_old) {
kill 'PIPE', $$;
- $self->wq_wait_old;
- } else {
- $self->wq_kill;
- $self->wq_close;
+ $lxs->wq_wait_old;
}
- close(delete $lei_orig->{1}) if $lei_orig->{1};
+ close(delete $lei->{1}) if $lei->{1};
}
sub do_query {
@@ -266,8 +266,8 @@ sub do_query {
$lei_orig->event_step_init; # wait for shutdowns
my $done_op = {
- '' => [ \&query_done, $self, $lei_orig ],
- '!' => [ \&sigpipe_handler, $self, $lei_orig ]
+ '' => [ \&query_done, $lei_orig ],
+ '!' => [ \&sigpipe_handler, $lei_orig ]
};
my $in_loop = exists $lei_orig->{sock};
$done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH 10/12] lei: remove @TO_CLOSE_ATFORK_CHILD
2021-01-21 19:46 [PATCH 00/12] lei: another dump Eric Wong
` (8 preceding siblings ...)
2021-01-21 19:46 ` [PATCH 09/12] lei_xsearch: reduce reference paths to lxs Eric Wong
@ 2021-01-21 19:46 ` Eric Wong
2021-01-21 19:46 ` [PATCH 11/12] lei: forget-external support with canonicalization Eric Wong
2021-01-21 19:46 ` [PATCH 12/12] lei forget-external: bash completion support Eric Wong
11 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2021-01-21 19:46 UTC (permalink / raw)
To: meta
..At least limit it to a single file handle. The write end
EOFpipe can be limited in scope and auto-closed when $quit is
clobbered, leaving only the listener. The listener is the only
handle that needs to be closed explicitly due to it being on the
stack in the Listener->event_step => accept_dispatch => lei_$FOO
code path.
Everything else gets clobbered by DS->Reset in children after
forking.
---
lib/PublicInbox/LEI.pm | 40 +++++++++++++++++++---------------------
1 file changed, 19 insertions(+), 21 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index ccfc1649..37b45a00 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -27,7 +27,7 @@ use Text::Wrap qw(wrap);
use File::Path qw(mkpath);
use File::Spec;
our $quit = \&CORE::exit;
-our ($current_lei, $errors_log);
+our ($current_lei, $errors_log, $listener);
my ($recv_cmd, $send_cmd);
my $GLP = Getopt::Long::Parser->new;
$GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
@@ -35,7 +35,6 @@ my $GLP_PASS = Getopt::Long::Parser->new;
$GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through));
our %PATH2CFG; # persistent for socket daemon
-our @TO_CLOSE_ATFORK_CHILD;
# TBD: this is a documentation mechanism to show a subcommand
# (may) pass options through to another command:
@@ -281,8 +280,7 @@ sub fail ($$;$) {
sub atfork_prepare_wq {
my ($self, $wq) = @_;
- my $tcafc = $wq->{-ipc_atfork_child_close} //= [];
- push @$tcafc, @TO_CLOSE_ATFORK_CHILD;
+ my $tcafc = $wq->{-ipc_atfork_child_close} //= [ $listener // () ];
if (my $sock = $self->{sock}) {
push @$tcafc, @$self{qw(0 1 2)}, $sock;
}
@@ -307,7 +305,6 @@ sub atfork_child_wq {
%PATH2CFG = ();
undef $errors_log;
$quit = \&CORE::exit;
- @TO_CLOSE_ATFORK_CHILD = ();
(__WARN__ => sub { err($self, @_) },
PIPE => sub {
$self->x_it(13); # SIGPIPE = 13
@@ -837,12 +834,12 @@ sub lazy_start {
die "connect($path): $!";
}
umask(077) // die("umask(077): $!");
- socket(my $l, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
- bind($l, pack_sockaddr_un($path)) or die "bind($path): $!";
- listen($l, 1024) or die "listen: $!";
+ local $listener;
+ socket($listener, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
+ bind($listener, pack_sockaddr_un($path)) or die "bind($path): $!";
+ listen($listener, 1024) or die "listen: $!";
my @st = stat($path) or die "stat($path): $!";
my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
- pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
local $oldset = PublicInbox::DS::block_signals();
if ($narg == 5) {
$send_cmd = PublicInbox::Spawn->can('send_cmd4');
@@ -869,20 +866,21 @@ sub lazy_start {
return if $pid;
$0 = "lei-daemon $path";
local %PATH2CFG;
- local @TO_CLOSE_ATFORK_CHILD = ($l, $eof_w);
- $l->blocking(0);
- $l = PublicInbox::Listener->new($l, \&accept_dispatch, $l);
+ $listener->blocking(0);
my $exit_code;
- local $quit = sub {
- $exit_code //= shift;
- my $listener = $l or exit($exit_code);
- # closing eof_w triggers \&noop wakeup
- $eof_w = $l = $path = undef;
- $listener->close; # DS::close
- PublicInbox::DS->SetLoopTimeout(1000);
+ my $pil = PublicInbox::Listener->new($listener, \&accept_dispatch);
+ local $quit = do {
+ pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
+ PublicInbox::EOFpipe->new($eof_r, \&noop, undef);
+ sub {
+ $exit_code //= shift;
+ my $lis = $pil or exit($exit_code);
+ # closing eof_w triggers \&noop wakeup
+ $listener = $eof_w = $pil = $path = undef;
+ $lis->close; # DS::close
+ PublicInbox::DS->SetLoopTimeout(1000);
+ };
};
- PublicInbox::EOFpipe->new($eof_r, \&noop, undef);
- undef $eof_r;
my $sig = {
CHLD => \&PublicInbox::DS::enqueue_reap,
QUIT => $quit,
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH 11/12] lei: forget-external support with canonicalization
2021-01-21 19:46 [PATCH 00/12] lei: another dump Eric Wong
` (9 preceding siblings ...)
2021-01-21 19:46 ` [PATCH 10/12] lei: remove @TO_CLOSE_ATFORK_CHILD Eric Wong
@ 2021-01-21 19:46 ` Eric Wong
2021-01-21 19:46 ` [PATCH 12/12] lei forget-external: bash completion support Eric Wong
11 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2021-01-21 19:46 UTC (permalink / raw)
To: meta
For proper matching, we'll do a better job canonicalizing
URLs and path names for matching. Of course, users may edit
the file outside of lei, so ensure we try both the canonicalized
and as-is form provided by the user.
I also don't think we'll need to store externals info in
MiscIdx; just the config file is fine.
---
MANIFEST | 1 +
lib/PublicInbox/LEI.pm | 24 ++++++++++-----
lib/PublicInbox/LeiExternal.pm | 54 +++++++++++++++++++++++++++-------
t/lei.t | 9 ++++++
t/lei_external.t | 18 ++++++++++++
5 files changed, 88 insertions(+), 18 deletions(-)
create mode 100644 t/lei_external.t
diff --git a/MANIFEST b/MANIFEST
index 0de1de4a..ddee1539 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -339,6 +339,7 @@ t/kqnotify.t
t/lei-oneshot.t
t/lei.t
t/lei_dedupe.t
+t/lei_external.t
t/lei_overview.t
t/lei_store.t
t/lei_to_mail.t
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 37b45a00..9c3d7279 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -21,7 +21,7 @@ use PublicInbox::Config;
use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET);
use PublicInbox::Sigfd;
use PublicInbox::DS qw(now dwaitpid);
-use PublicInbox::Spawn qw(spawn run_die popen_rd);
+use PublicInbox::Spawn qw(spawn popen_rd);
use PublicInbox::OnDestroy;
use Text::Wrap qw(wrap);
use File::Path qw(mkpath);
@@ -95,7 +95,7 @@ our %CMD = ( # sorted in order of importance/use:
qw(boost=i quiet|q) ],
'ls-external' => [ '[FILTER...]', 'list publicinbox|extindex locations',
qw(format|f=s z|0 local remote quiet|q) ],
-'forget-external' => [ '{URL_OR_PATHNAME|--prune}',
+'forget-external' => [ 'URL_OR_PATHNAME...|--prune',
'exclude further results from a publicinbox|extindex',
qw(prune quiet|q) ],
@@ -114,7 +114,7 @@ our %CMD = ( # sorted in order of importance/use:
"exclude message(s) on stdin from `q' search results",
qw(stdin| oid=s exact by-mid|mid:s quiet|q) ],
-'purge-mailsource' => [ '{URL_OR_PATHNAME|--all}',
+'purge-mailsource' => [ 'URL_OR_PATHNAME|--all',
'remove imported messages from IMAP, Maildirs, and MH',
qw(exact! all jobs:i indexed) ],
@@ -137,7 +137,7 @@ our %CMD = ( # sorted in order of importance/use:
'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch',
qw(prune) ],
-'import' => [ '{URL_OR_PATHNAME|--stdin}',
+'import' => [ 'URL_OR_PATHNAME|--stdin',
'one-shot import/update from URL or filesystem',
qw(stdin| offset=i recursive|r exclude=s include=s !flags),
],
@@ -468,6 +468,7 @@ sub optparse ($$$) {
last;
} # else continue looping
}
+ last if $ok;
my $last = pop @or;
$err = join(', ', @or) . " or $last must be set";
} else {
@@ -547,16 +548,23 @@ sub lei_mark {
my ($self, @argv) = @_;
}
-sub lei_config {
+sub _config {
my ($self, @argv) = @_;
- $self->{opt}->{'config-file'} and return fail $self,
- "config file switches not supported by `lei config'";
my $env = $self->{env};
delete local $env->{GIT_CONFIG};
+ delete local $ENV{GIT_CONFIG};
my $cfg = _lei_cfg($self, 1);
my $cmd = [ qw(git config -f), $cfg->{'-f'}, @argv ];
my %rdr = map { $_ => $self->{$_} } (0..2);
- run_die($cmd, $env, \%rdr);
+ waitpid(spawn($cmd, $env, \%rdr), 0);
+}
+
+sub lei_config {
+ my ($self, @argv) = @_;
+ $self->{opt}->{'config-file'} and return fail $self,
+ "config file switches not supported by `lei config'";
+ _config(@_);
+ x_it($self, $?) if $?;
}
sub lei_init {
diff --git a/lib/PublicInbox/LeiExternal.pm b/lib/PublicInbox/LeiExternal.pm
index 64faf5a0..21071058 100644
--- a/lib/PublicInbox/LeiExternal.pm
+++ b/lib/PublicInbox/LeiExternal.pm
@@ -7,6 +7,7 @@ use strict;
use v5.10.1;
use parent qw(Exporter);
our @EXPORT = qw(lei_ls_external lei_add_external lei_forget_external);
+use PublicInbox::Config;
sub _externals_each {
my ($self, $cb, @arg) = @_;
@@ -30,7 +31,6 @@ sub _externals_each {
sub lei_ls_external {
my ($self, @argv) = @_;
- my $stor = $self->_lei_store(0);
my $out = $self->{1};
my ($OFS, $ORS) = $self->{opt}->{z} ? ("\0", "\0\0") : (" ", "\n");
$self->_externals_each(sub {
@@ -39,24 +39,58 @@ sub lei_ls_external {
});
}
+sub _canonicalize {
+ my ($location) = @_;
+ if ($location !~ m!\Ahttps?://!) {
+ PublicInbox::Config::rel2abs_collapsed($location);
+ } else {
+ require URI;
+ my $uri = URI->new($location)->canonical;
+ my $path = $uri->path . '/';
+ $path =~ tr!/!/!s; # squeeze redundant '/'
+ $uri->path($path);
+ $uri->as_string;
+ }
+}
+
sub lei_add_external {
- my ($self, $url_or_dir) = @_;
+ my ($self, $location) = @_;
my $cfg = $self->_lei_cfg(1);
- if ($url_or_dir !~ m!\Ahttps?://!) {
- $url_or_dir = File::Spec->canonpath($url_or_dir);
- }
my $new_boost = $self->{opt}->{boost} // 0;
- my $key = "external.$url_or_dir.boost";
+ $location = _canonicalize($location);
+ my $key = "external.$location.boost";
my $cur_boost = $cfg->{$key};
return if defined($cur_boost) && $cur_boost == $new_boost; # idempotent
$self->lei_config($key, $new_boost);
- my $stor = $self->_lei_store(1);
- # TODO: add to MiscIdx
- $stor->done;
+ $self->_lei_store(1)->done; # just create the store
}
sub lei_forget_external {
- # TODO
+ my ($self, @locations) = @_;
+ my $cfg = $self->_lei_cfg(1);
+ my $quiet = $self->{opt}->{quiet};
+ for my $loc (@locations) {
+ my (@unset, @not_found);
+ for my $l ($loc, _canonicalize($loc)) {
+ my $key = "external.$l.boost";
+ delete($cfg->{$key});
+ $self->_config('--unset', $key);
+ if ($? == 0) {
+ push @unset, $key;
+ } elsif (($? >> 8) == 5) {
+ push @not_found, $key;
+ } else {
+ $self->err("# --unset $key error");
+ return $self->x_it($?);
+ }
+ }
+ if (@unset) {
+ next if $quiet;
+ $self->err("# $_ unset") for @unset;
+ } elsif (@not_found) {
+ $self->err("# $_ not found") for @not_found;
+ } # else { already exited
+ }
}
1;
diff --git a/t/lei.t b/t/lei.t
index ef820fe3..50ad2bb1 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -180,6 +180,15 @@ my $test_external = sub {
});
$lei->('ls-external');
like($out, qr/boost=0\n/s, 'ls-external has output');
+ ok($lei->(qw(add-external -q https://EXAMPLE.com/ibx)), 'add remote');
+ is($err, '', 'no warnings after add-external');
+ $lei->('ls-external');
+ like($out, qr!https://example\.com/ibx/!s, 'added canonical URL');
+ is($err, '', 'no warnings on ls-external');
+ ok($lei->(qw(forget-external -q https://EXAMPLE.com/ibx)),
+ 'forget');
+ $lei->('ls-external');
+ unlike($out, qr!https://example\.com/ibx/!s, 'removed canonical URL');
ok(!$lei->(qw(q s:prefix -o /dev/null -f maildir)), 'bad maildir');
like($err, qr!/dev/null exists and is not a directory!,
diff --git a/t/lei_external.t b/t/lei_external.t
new file mode 100644
index 00000000..1f0048a1
--- /dev/null
+++ b/t/lei_external.t
@@ -0,0 +1,18 @@
+#!perl -w
+use strict;
+use v5.10.1;
+use Test::More;
+my $cls = 'PublicInbox::LeiExternal';
+require_ok $cls;
+my $canon = $cls->can('_canonicalize');
+my $exp = 'https://example.com/my-inbox/';
+is($canon->('https://example.com/my-inbox'), $exp, 'trailing slash added');
+is($canon->('https://example.com/my-inbox//'), $exp, 'trailing slash removed');
+is($canon->('https://example.com//my-inbox/'), $exp, 'leading slash removed');
+is($canon->('https://EXAMPLE.com/my-inbox/'), $exp, 'lowercased');
+is($canon->('/this/path/is/nonexistent/'), '/this/path/is/nonexistent',
+ 'non-existent pathname canonicalized');
+is($canon->('/this//path/'), '/this/path', 'extra slashes gone');
+is($canon->('/ALL/CAPS'), '/ALL/CAPS', 'caps preserved');
+
+done_testing;
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH 12/12] lei forget-external: bash completion support
2021-01-21 19:46 [PATCH 00/12] lei: another dump Eric Wong
` (10 preceding siblings ...)
2021-01-21 19:46 ` [PATCH 11/12] lei: forget-external support with canonicalization Eric Wong
@ 2021-01-21 19:46 ` Eric Wong
11 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2021-01-21 19:46 UTC (permalink / raw)
To: meta
The tricky bit was getting around word splitting bash
does on URLs. This may work with other shells, too.
---
lib/PublicInbox/LEI.pm | 4 ++++
lib/PublicInbox/LeiExternal.pm | 17 +++++++++++++++++
2 files changed, 21 insertions(+)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 9c3d7279..ef3f90fc 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -655,6 +655,10 @@ sub lei__complete {
} elsif ($cmd eq 'config' && !@argv && !$CONFIG_KEYS{$cur}) {
puts $self, grep(/$re/, keys %CONFIG_KEYS);
}
+ $cmd =~ tr/-/_/;
+ if (my $sub = $self->can("_complete_$cmd")) {
+ puts $self, $sub->($self, @argv, $cur);
+ }
# TODO: URLs, pathnames, OIDs, MIDs, etc... See optparse() for
# proto parsing.
}
diff --git a/lib/PublicInbox/LeiExternal.pm b/lib/PublicInbox/LeiExternal.pm
index 21071058..59c3c367 100644
--- a/lib/PublicInbox/LeiExternal.pm
+++ b/lib/PublicInbox/LeiExternal.pm
@@ -93,4 +93,21 @@ sub lei_forget_external {
}
}
+# shell completion helper called by lei__complete
+sub _complete_forget_external {
+ my ($self, @argv) = @_;
+ my $cfg = $self->_lei_cfg(0);
+ my $cur = pop @argv;
+ # Workaround bash word-splitting URLs to ['https', ':', '//' ...]
+ # Maybe there's a better way to go about this in
+ # contrib/completion/lei-completion.bash
+ my $colon = ($argv[-1] // '') eq ':';
+ my $re = $cur =~ /\A[\w-]/ ? '' : '.*';
+ map {
+ my $x = substr($_, length('external.'));
+ # only return the part specified on the CLI
+ $colon && $x =~ /(\Q$cur\E.*)/ ? $1 : $x;
+ } grep(/\Aexternal\.$re\Q$cur/, @{$cfg->{-section_order}});
+}
+
1;
^ permalink raw reply related [flat|nested] 13+ messages in thread