1/12 is a user-visible change, but there's no users, yet :P more externals work coming... 12/12 may be too specific to bash, help from non-bash users appreciated Eric Wong (12): lei_overview: rename {relevance} => {pct} lei q: retrieve keywords for local, non-external messages lei_xsearch: eliminate some unused, commented-out code lei: show {pct} and {oid} in From_ lines and filenames lei: fix inadvertant FD sharing lei_to_mail: avoid segfault on exit lei: oneshot: use client $io[2] for placeholder lei: remove INT/QUIT/TERM handlers, fix daemon EOF lei_xsearch: reduce reference paths to lxs lei: remove @TO_CLOSE_ATFORK_CHILD lei: forget-external support with canonicalization lei forget-external: bash completion support MANIFEST | 1 + lib/PublicInbox/IPC.pm | 23 +++++++-- lib/PublicInbox/LEI.pm | 86 ++++++++++++++++++++-------------- lib/PublicInbox/LeiExternal.pm | 71 ++++++++++++++++++++++++---- lib/PublicInbox/LeiOverview.pm | 17 +++---- lib/PublicInbox/LeiQuery.pm | 21 +++++---- lib/PublicInbox/LeiSearch.pm | 16 ++----- lib/PublicInbox/LeiToMail.pm | 80 ++++++++++++++++++------------- lib/PublicInbox/LeiXSearch.pm | 60 ++++++++++++------------ lib/PublicInbox/Search.pm | 20 +++++++- script/lei | 5 -- t/lei.t | 9 ++++ t/lei_external.t | 18 +++++++ t/lei_to_mail.t | 41 +++++++++------- 14 files changed, 300 insertions(+), 168 deletions(-) create mode 100644 t/lei_external.t
The old name was too long compared to the rest of the field names. With the Xapian method being named ->get_percent, "pct" is a well known abbreviation for "percent" and already used internally by our wrapper. ..And cleanup some excess whitespace while we're in the area. --- lib/PublicInbox/LeiOverview.pm | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index cab2b055..8799f1cc 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -135,11 +135,9 @@ sub _unbless_smsg { delete @$smsg{qw(lines bytes num tid)}; $smsg->{rt} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt $smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate - $smsg->{relevance} = get_pct($mitem) if $mitem; - + $smsg->{pct} = get_pct($mitem) if $mitem; if (my $r = delete $smsg->{references}) { - $smsg->{refs} = [ - map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ]; + $smsg->{refs} = [ map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ]; } if (my $m = delete($smsg->{mid})) { $smsg->{'m'} = "<$m>";
This isn't tested for now, so maybe it works. --- lib/PublicInbox/LeiOverview.pm | 8 +++----- lib/PublicInbox/LeiSearch.pm | 16 +++------------- lib/PublicInbox/LeiXSearch.pm | 14 ++++++++++---- lib/PublicInbox/Search.pm | 20 +++++++++++++++++++- 4 files changed, 35 insertions(+), 23 deletions(-) diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index 8799f1cc..47d9eb31 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -224,9 +224,8 @@ sub ovv_each_smsg_cb { # runs in wq worker usually my $git_dir = $git->{git_dir}; sub { my ($smsg, $mitem) = @_; - my $kw = []; # TODO get from mitem $l2m->wq_do('write_mail', \@io, $git_dir, - $smsg->{blob}, $lei_ipc, $kw) + $smsg->{blob}, $lei_ipc, $smsg->{kw}); } } elsif ($l2m) { my $wcb = $l2m->write_cb($lei); @@ -235,8 +234,8 @@ sub ovv_each_smsg_cb { # runs in wq worker usually my $g2m = $l2m->can('git_to_mail'); sub { my ($smsg, $mitem) = @_; - my $kw = []; # TODO get from mitem - $git->cat_async($smsg->{blob}, $g2m, [ $wcb, $kw ]); + $git->cat_async($smsg->{blob}, $g2m, + [ $wcb, $smsg->{kw} ]); }; } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) { my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},"; @@ -266,7 +265,6 @@ sub ovv_each_smsg_cb { # runs in wq worker usually $lei->{ovv_buf} = \(my $buf = ''); sub { my ($smsg, $mitem) = @_; - delete @$smsg{qw(tid num)}; $buf .= $json->encode(_unbless_smsg(@_)) . $ORS; if (length($buf) > 65536) { my $lk = $self->lock_for_scope; diff --git a/lib/PublicInbox/LeiSearch.pm b/lib/PublicInbox/LeiSearch.pm index b7e337de..440bacf5 100644 --- a/lib/PublicInbox/LeiSearch.pm +++ b/lib/PublicInbox/LeiSearch.pm @@ -5,7 +5,7 @@ package PublicInbox::LeiSearch; use strict; use v5.10.1; use parent qw(PublicInbox::ExtSearch); -use PublicInbox::Search; +use PublicInbox::Search qw(xap_terms); # get combined docid from over.num: # (not generic Xapian, only works with our sharding scheme) @@ -19,19 +19,9 @@ sub msg_keywords { my ($self, $num) = @_; # num_or_mitem my $xdb = $self->xdb; # set {nshard}; my $docid = ref($num) ? $num->get_docid : num2docid($self, $num); - my %kw; - eval { - my $end = $xdb->termlist_end($docid); - my $cur = $xdb->termlist_begin($docid); - for (; $cur != $end; $cur++) { - $cur->skip_to('K'); - last if $cur == $end; - my $kw = $cur->get_termname; - $kw =~ s/\AK//s and $kw{$kw} = undef; - } - }; + my $kw = xap_terms('K', $xdb, $docid); warn "E: #$docid ($num): $@\n" if $@; - wantarray ? sort(keys(%kw)) : \%kw; + wantarray ? sort(keys(%$kw)) : $kw; } 1; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index a6d827de..d7688ede 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -13,6 +13,7 @@ use PublicInbox::OpPipe; use PublicInbox::Import; use File::Temp 0.19 (); # 0.19 for ->newdir use File::Spec (); +use PublicInbox::Search qw(xap_terms); sub new { my ($class) = @_; @@ -74,7 +75,12 @@ sub smsg_for { my $docid = $mitem->get_docid; my $shard = ($docid - 1) % $nshard; my $num = int(($docid - 1) / $nshard) + 1; - my $smsg = $self->{shard2ibx}->[$shard]->over->get_art($num); + my $ibx = $self->{shard2ibx}->[$shard]; + my $smsg = $ibx->over->get_art($num); + if (ref($ibx->can('msg_keywords'))) { + my $kw = xap_terms('K', $mitem->get_document); + $smsg->{kw} = [ sort keys %$kw ]; + } $smsg->{docid} = $docid; $smsg; } @@ -153,11 +159,11 @@ sub query_mset { # non-parallel for non-"--thread" users $dedupe->prepare_dedupe; do { $mset = $self->mset($mo->{qstr}, $mo); - for my $it ($mset->items) { - my $smsg = smsg_for($self, $it) or next; + for my $mitem ($mset->items) { + my $smsg = smsg_for($self, $mitem) or next; wait_startq($startq) if $startq; next if $dedupe->is_smsg_dup($smsg); - $each_smsg->($smsg, $it); + $each_smsg->($smsg, $mitem); } } while (_mset_more($mset, $mo)); undef $each_smsg; # drops @io for l2m->{each_smsg_done} diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index a4b40f94..7c6a16be 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -6,7 +6,7 @@ package PublicInbox::Search; use strict; use parent qw(Exporter); -our @EXPORT_OK = qw(retry_reopen int_val get_pct); +our @EXPORT_OK = qw(retry_reopen int_val get_pct xap_terms); use List::Util qw(max); # values for searching, changing the numeric value breaks @@ -432,4 +432,22 @@ sub get_pct ($) { # mset item $n > 99 ? 99 : $n; } +sub xap_terms ($$;@) { + my ($pfx, $xdb_or_doc, @docid) = @_; # @docid may be empty () + my %ret; + eval { + my $end = $xdb_or_doc->termlist_end(@docid); + my $cur = $xdb_or_doc->termlist_begin(@docid); + for (; $cur != $end; $cur++) { + $cur->skip_to($pfx); + last if $cur == $end; + my $tn = $cur->get_termname; + if (index($tn, $pfx) == 0) { + $ret{substr($tn, length($pfx))} = undef; + } + } + }; + \%ret; +} + 1;
--- lib/PublicInbox/LeiXSearch.pm | 5 ----- 1 file changed, 5 deletions(-) diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index d7688ede..13611882 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -291,11 +291,6 @@ sub do_query { my @pids = $self->wq_close; # for the $lei->atfork_child_wq PIPE handler: $done_op->{'!'}->[3] = \@pids; - # $done->event_step; - # my $ipc_worker_reap = $self->can('ipc_worker_reap'); - # if (my $l2m_pids = delete $self->{l2m_pids}) { - # dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids; - # } while ($done->{sock}) { $done->event_step } my $ipc_worker_reap = $self->can('ipc_worker_reap'); dwaitpid($_, $ipc_worker_reap, $self) for @pids;
From_ lines are shown when mbox* variants are output to stdout, making {oid} and {pct} information visible without risking being propagated to other importer processes if they were in lei-specific X-* headers. Maildirs already had OIDs in the filename, now they gain Xapian {pct} in case anybody cares. --- lib/PublicInbox/LeiOverview.pm | 9 ++--- lib/PublicInbox/LeiToMail.pm | 60 +++++++++++++++++++--------------- t/lei_to_mail.t | 41 +++++++++++++---------- 3 files changed, 61 insertions(+), 49 deletions(-) diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index 47d9eb31..7a4fa857 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -224,8 +224,9 @@ sub ovv_each_smsg_cb { # runs in wq worker usually my $git_dir = $git->{git_dir}; sub { my ($smsg, $mitem) = @_; - $l2m->wq_do('write_mail', \@io, $git_dir, - $smsg->{blob}, $lei_ipc, $smsg->{kw}); + $smsg->{pct} = get_pct($mitem) if $mitem; + $l2m->wq_do('write_mail', \@io, $git_dir, $smsg, + $lei_ipc); } } elsif ($l2m) { my $wcb = $l2m->write_cb($lei); @@ -234,8 +235,8 @@ sub ovv_each_smsg_cb { # runs in wq worker usually my $g2m = $l2m->can('git_to_mail'); sub { my ($smsg, $mitem) = @_; - $git->cat_async($smsg->{blob}, $g2m, - [ $wcb, $smsg->{kw} ]); + $smsg->{pct} = get_pct($mitem) if $mitem; + $git->cat_async($smsg->{blob}, $g2m, [ $wcb, $smsg ]); }; } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) { my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},"; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 1be0b09c..3dcce9e7 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -32,14 +32,14 @@ my %kw2status = ( ); sub _mbox_hdr_buf ($$$) { - my ($eml, $type, $kw) = @_; + my ($eml, $type, $smsg) = @_; $eml->header_set($_) for (qw(Lines Bytes Content-Length)); # Messages are always 'O' (non-\Recent in IMAP), it saves # MUAs the trouble of rewriting the mbox if no other # changes are made my %hdr = (Status => [ 'O' ]); # set Status, X-Status - for my $k (@$kw) { + for my $k (@{$smsg->{kw} // []}) { if (my $ent = $kw2status{$k}) { push @{$hdr{$ent->[0]}}, $ent->[1]; } else { # X-Label? @@ -53,9 +53,11 @@ sub _mbox_hdr_buf ($$$) { # fixup old bug from import (pre-a0c07cba0e5d8b6a) $$buf =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s; + my $ident = $smsg->{blob} // 'lei'; + if (defined(my $pct = $smsg->{pct})) { $ident .= "=$pct" } substr($$buf, 0, 0, # prepend From line - "From lei\@$type Thu Jan 1 00:00:00 1970$eml->{crlf}"); + "From $ident\@$type Thu Jan 1 00:00:00 1970$eml->{crlf}"); $buf; } @@ -71,8 +73,8 @@ sub _print_full { } sub eml2mboxrd ($;$) { - my ($eml, $kw) = @_; - my $buf = _mbox_hdr_buf($eml, 'mboxrd', $kw); + my ($eml, $smsg) = @_; + my $buf = _mbox_hdr_buf($eml, 'mboxrd', $smsg); if (my $bdy = delete $eml->{bdy}) { $$bdy =~ s/^(>*From )/>$1/gm; $$buf .= $eml->{crlf}; @@ -84,8 +86,8 @@ sub eml2mboxrd ($;$) { } sub eml2mboxo { - my ($eml, $kw) = @_; - my $buf = _mbox_hdr_buf($eml, 'mboxo', $kw); + my ($eml, $smsg) = @_; + my $buf = _mbox_hdr_buf($eml, 'mboxo', $smsg); if (my $bdy = delete $eml->{bdy}) { $$bdy =~ s/^From />From /gm; $$buf .= $eml->{crlf}; @@ -108,8 +110,8 @@ sub _mboxcl_common ($$$) { # mboxcl still escapes "From " lines sub eml2mboxcl { - my ($eml, $kw) = @_; - my $buf = _mbox_hdr_buf($eml, 'mboxcl', $kw); + my ($eml, $smsg) = @_; + my $buf = _mbox_hdr_buf($eml, 'mboxcl', $smsg); my $crlf = $eml->{crlf}; if (my $bdy = delete $eml->{bdy}) { $$bdy =~ s/^From />From /gm; @@ -121,8 +123,8 @@ sub eml2mboxcl { # mboxcl2 has no "From " escaping sub eml2mboxcl2 { - my ($eml, $kw) = @_; - my $buf = _mbox_hdr_buf($eml, 'mboxcl2', $kw); + my ($eml, $smsg) = @_; + my $buf = _mbox_hdr_buf($eml, 'mboxcl2', $smsg); my $crlf = $eml->{crlf}; if (my $bdy = delete $eml->{bdy}) { _mboxcl_common($buf, $bdy, $crlf); @@ -140,10 +142,11 @@ sub git_to_mail { # git->cat_async callback warn "unexpected type=$type for $oid\n"; } } - if ($size > 0) { - my ($write_cb, $kw) = @$arg; - $write_cb->($bref, $oid, $kw); + my ($write_cb, $smsg) = @$arg; + if ($smsg->{blob} ne $oid) { + die "BUG: expected=$smsg->{blob} got=$oid"; } + $write_cb->($bref, $smsg) if $size > 0; } sub reap_compress { # dwaitpid callback @@ -247,11 +250,11 @@ sub _mbox_write_cb ($$) { my $dedupe = $lei->{dedupe}; $dedupe->prepare_dedupe; sub { # for git_to_mail - my ($buf, $oid, $kw) = @_; + my ($buf, $smsg) = @_; return unless $out; my $eml = PublicInbox::Eml->new($buf); - if (!$dedupe->is_dup($eml, $oid)) { - $buf = $eml2mbox->($eml, $kw); + if (!$dedupe->is_dup($eml, $smsg->{blob})) { + $buf = $eml2mbox->($eml, $smsg); my $lk = $ovv->lock_for_scope; eval { $write->($out, $buf) }; if ($@) { @@ -283,12 +286,15 @@ sub _augment_file { # _maildir_each_file cb sub _unlink { unlink($_[0]) } sub _buf2maildir { - my ($dst, $buf, $oid, $kw) = @_; + my ($dst, $buf, $smsg) = @_; + my $kw = $smsg->{kw} // []; my $sfx = join('', sort(map { $kw2char{$_} // () } @$kw)); my $rand = ''; # chosen by die roll :P my ($tmp, $fh, $final); + my $common = $smsg->{blob}; + if (defined(my $pct = $smsg->{pct})) { $common .= "=$pct" } do { - $tmp = $dst.'tmp/'.$rand."oid=$oid"; + $tmp = $dst.'tmp/'.$rand.$common; } while (!sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY) && $! == EEXIST && ($rand = int(rand 0x7fffffff).',')); if (print $fh $$buf and close($fh)) { @@ -299,14 +305,14 @@ sub _buf2maildir { $dst .= 'cur/'; $rand = ''; do { - $final = $dst.$rand."oid=$oid:2,$sfx"; + $final = $dst.$rand.$common.':2,'.$sfx; } while (!link($tmp, $final) && $! == EEXIST && ($rand = int(rand 0x7fffffff).',')); unlink($tmp) or warn "W: failed to unlink $tmp: $!\n"; } else { my $err = $!; unlink($tmp); - die "Error writing $oid to $dst: $err"; + die "Error writing $smsg->{blob} to $dst: $err"; } } @@ -316,12 +322,12 @@ sub _maildir_write_cb ($$) { $dedupe->prepare_dedupe; my $dst = $lei->{ovv}->{dst}; sub { # for git_to_mail - my ($buf, $oid, $kw) = @_; - return _buf2maildir($dst, $buf, $oid, $kw) if !$dedupe; + my ($buf, $smsg) = @_; + return _buf2maildir($dst, $buf, $smsg) if !$dedupe; my $eml = PublicInbox::Eml->new($$buf); # copy buf - return if $dedupe->is_dup($eml, $oid); + return if $dedupe->is_dup($eml, $smsg->{blob}); undef $eml; - _buf2maildir($dst, $buf, $oid, $kw); + _buf2maildir($dst, $buf, $smsg); } } @@ -447,7 +453,7 @@ sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon } sub write_mail { # via ->wq_do - my ($self, $git_dir, $oid, $lei, $kw) = @_; + my ($self, $git_dir, $smsg, $lei) = @_; my $not_done = delete $self->{4}; # write end of {each_smsg_done} my $wcb = $self->{wcb} //= do { # first message my %sig = $lei->atfork_child_wq($self); @@ -456,7 +462,7 @@ sub write_mail { # via ->wq_do $self->write_cb($lei); }; my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir); - $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw, $not_done ]); + $git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]); } sub ipc_atfork_prepare { diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t index 6673d9a6..47c0e3d4 100644 --- a/t/lei_to_mail.t +++ b/t/lei_to_mail.t @@ -18,11 +18,12 @@ my $noeol = "Subject: x\n\nFrom hell"; my $crlf = $noeol; $crlf =~ s/\n/\r\n/g; my $kw = [qw(seen answered flagged)]; +my $smsg = { kw => $kw, blob => '0'x40 }; my @MBOX = qw(mboxcl2 mboxrd mboxcl mboxo); for my $mbox (@MBOX) { my $m = "eml2$mbox"; my $cb = PublicInbox::LeiToMail->can($m); - my $s = $cb->(PublicInbox::Eml->new($from), $kw); + my $s = $cb->(PublicInbox::Eml->new($from), $smsg); is(substr($$s, -1, 1), "\n", "trailing LF in normal $mbox"); my $eml = PublicInbox::Eml->new($s); is($eml->header('Status'), 'OR', "Status: set by $m"); @@ -40,7 +41,7 @@ for my $mbox (@MBOX) { } else { is(scalar(@cl), 0, "$m clobbered Content-Length"); } - $s = $cb->(PublicInbox::Eml->new($noeol), $kw); + $s = $cb->(PublicInbox::Eml->new($noeol), $smsg); is(substr($$s, -1, 1), "\n", "trailing LF added by $m when original lacks EOL"); $eml = PublicInbox::Eml->new($s); @@ -49,7 +50,7 @@ for my $mbox (@MBOX) { } else { is($eml->body_raw, ">From hell\n", "From escaped once by $m"); } - $s = $cb->(PublicInbox::Eml->new($crlf), $kw); + $s = $cb->(PublicInbox::Eml->new($crlf), $smsg); is(substr($$s, -2, 2), "\r\n", "trailing CRLF added $m by original lacks EOL"); $eml = PublicInbox::Eml->new($s); @@ -62,7 +63,7 @@ for my $mbox (@MBOX) { is($eml->header('Content-Length') + length("\r\n"), length($eml->body_raw), "$m Content-Length matches"); } elsif ($mbox eq 'mboxrd') { - $s = $cb->($eml, $kw); + $s = $cb->($eml, $smsg); $eml = PublicInbox::Eml->new($s); is($eml->body_raw, ">>From hell\r\n\r\n", "From escaped again by $m"); @@ -102,11 +103,12 @@ my $wcb_get = sub { $cb; }; +my $deadbeef = { blob => 'deadbeef', kw => [ qw(seen) ] }; my $orig = do { my $wcb = $wcb_get->($mbox, $fn); is(ref $wcb, 'CODE', 'write_cb returned callback'); ok(-f $fn && !-s _, 'empty file created'); - $wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]); + $wcb->(\(my $dup = $buf), $deadbeef); undef $wcb; open my $fh, '<', $fn or BAIL_OUT $!; my $raw = do { local $/; <$fh> }; @@ -116,7 +118,7 @@ my $orig = do { local $lei->{opt} = { jobs => 2 }; $wcb = $wcb_get->($mbox, $fn); ok(-f $fn && !-s _, 'truncated mbox destination'); - $wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]); + $wcb->(\($dup = $buf), $deadbeef); undef $wcb; open $fh, '<', $fn or BAIL_OUT $!; is(do { local $/; <$fh> }, $raw, 'jobs > 1'); @@ -131,7 +133,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? ok($dc_cmd, "decompressor for .$zsfx"); my $f = "$fn.$zsfx"; my $wcb = $wcb_get->($mbox, $f); - $wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]); + $wcb->(\(my $dup = $buf), $deadbeef); undef $wcb; my $uncompressed = xqx([@$dc_cmd, $f]); is($uncompressed, $orig, "$zsfx works unlocked"); @@ -139,13 +141,13 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? local $lei->{opt} = { jobs => 2 }; # for atomic writes unlink $f or BAIL_OUT "unlink $!"; $wcb = $wcb_get->($mbox, $f); - $wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]); + $wcb->(\($dup = $buf), $deadbeef); undef $wcb; is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock"); local $lei->{opt} = { augment => 1 }; $wcb = $wcb_get->($mbox, $f); - $wcb->(\($dup = $buf . "\nx\n"), 'deadbeef', [ qw(seen) ]); + $wcb->(\($dup = $buf . "\nx\n"), $deadbeef); undef $wcb; # commit my $cat = popen_rd([@$dc_cmd, $f]); @@ -157,7 +159,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? local $lei->{opt} = { augment => 1, jobs => 2 }; $wcb = $wcb_get->($mbox, $f); - $wcb->(\($dup = $buf . "\ny\n"), 'deadbeef', [ qw(seen) ]); + $wcb->(\($dup = $buf . "\ny\n"), $deadbeef); undef $wcb; # commit my @raw3; @@ -179,7 +181,8 @@ my $as_orig = sub { unlink $fn or BAIL_OUT $!; if ('default deduplication uses content_hash') { my $wcb = $wcb_get->('mboxo', $fn); - $wcb->(\(my $x = $buf), 'deadbeef', []) for (1..2); + $deadbeef->{kw} = []; + $wcb->(\(my $x = $buf), $deadbeef) for (1..2); undef $wcb; # undef to commit changes my $cmp = ''; open my $fh, '<', $fn or BAIL_OUT $!; @@ -188,7 +191,7 @@ if ('default deduplication uses content_hash') { local $lei->{opt} = { augment => 1 }; $wcb = $wcb_get->('mboxo', $fn); - $wcb->(\($x = $buf . "\nx\n"), 'deadbeef', []) for (1..2); + $wcb->(\($x = $buf . "\nx\n"), $deadbeef) for (1..2); undef $wcb; # undef to commit changes open $fh, '<', $fn or BAIL_OUT $!; my @x; @@ -202,7 +205,7 @@ if ('default deduplication uses content_hash') { open my $tmp, '+>', undef or BAIL_OUT $!; local $lei->{1} = $tmp; my $wcb = $wcb_get->('mboxrd', '/dev/stdout'); - $wcb->(\(my $x = $buf), 'deadbeef', []); + $wcb->(\(my $x = $buf), $deadbeef); undef $wcb; # commit seek($tmp, 0, SEEK_SET) or BAIL_OUT $!; my $cmp = ''; @@ -216,7 +219,7 @@ SKIP: { # FIFO support mkfifo($fn, 0600) or skip("mkfifo not supported: $!", 1); my $cat = popen_rd([which('cat'), $fn]); my $wcb = $wcb_get->('mboxo', $fn); - $wcb->(\(my $x = $buf), 'deadbeef', []); + $wcb->(\(my $x = $buf), $deadbeef); undef $wcb; # commit my $cmp = ''; PublicInbox::MboxReader->mboxo($cat, sub { $cmp .= $as_orig->(@_) }); @@ -227,7 +230,8 @@ SKIP: { # FIFO support my $md = "$tmpdir/maildir/"; my $wcb = $wcb_get->('maildir', $md); is(ref($wcb), 'CODE', 'got Maildir callback'); - $wcb->(\(my $x = $buf), 'badc0ffee', []); + my $b4dc0ffee = { blob => 'badc0ffee', kw => [] }; + $wcb->(\(my $x = $buf), $b4dc0ffee); my @f; PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @f, shift }); @@ -235,7 +239,8 @@ SKIP: { # FIFO support is(do { local $/; <$fh> }, $buf, 'wrote to Maildir'); $wcb = $wcb_get->('maildir', $md); - $wcb->(\($x = $buf."\nx\n"), 'deadcafe', []); + my $deadcafe = { blob => 'deadcafe', kw => [] }; + $wcb->(\($x = $buf."\nx\n"), $deadcafe); my @x = (); PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @x, shift }); @@ -246,8 +251,8 @@ SKIP: { # FIFO support local $lei->{opt}->{augment} = 1; $wcb = $wcb_get->('maildir', $md); - $wcb->(\($x = $buf."\ny\n"), 'deadcafe', []); - $wcb->(\($x = $buf."\ny\n"), 'b4dc0ffee', []); # skipped by dedupe + $wcb->(\($x = $buf."\ny\n"), $deadcafe); + $wcb->(\($x = $buf."\ny\n"), $b4dc0ffee); # skipped by dedupe @f = (); PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @f, shift }); is(scalar grep(/\A\Q$x[0]\E\z/, @f), 1, 'old file still there');
$wq->{-ipc_atfork_child_close} neededed to be initialized properly. And start setting $0 in workers to improve visibility. --- lib/PublicInbox/IPC.pm | 22 ++++++++++++++++++---- lib/PublicInbox/LEI.pm | 9 +++++---- lib/PublicInbox/LeiQuery.pm | 21 +++++++++++---------- lib/PublicInbox/LeiToMail.pm | 2 +- lib/PublicInbox/LeiXSearch.pm | 27 ++++++++++++--------------- 5 files changed, 47 insertions(+), 34 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 8fec2e62..24f45e03 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -134,6 +134,12 @@ sub ipc_worker_reap { # dwaitpid callback warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15; } +sub wq_wait_old { + my ($self) = @_; + my $pids = delete $self->{"-wq_old_pids.$$"} or return; + dwaitpid($_, \&ipc_worker_reap, $self) for @$pids; +} + # for base class, override in sub classes sub ipc_atfork_prepare {} @@ -370,17 +376,25 @@ sub wq_workers { } sub wq_close { - my ($self) = @_; + my ($self, $nohang) = @_; delete @$self{qw(-wq_s1 -wq_s2)} or return; my $ppid = delete $self->{-wq_ppid} or return; my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers'; return if $ppid != $$; # can't reap siblings or parents - return (keys %$workers) if wantarray; # caller will reap - for my $pid (keys %$workers) { - dwaitpid($pid, \&ipc_worker_reap, $self); + my @pids = map { $_ + 0 } keys %$workers; + if ($nohang) { + push @{$self->{"-wq_old_pids.$$"}}, @pids; + } else { + dwaitpid($_, \&ipc_worker_reap, $self) for @pids; } } +sub wq_kill_old { + my ($self) = @_; + my $pids = $self->{"-wq_old_pids.$$"} or return; + kill 'TERM', @$pids; +} + sub wq_kill { my ($self, $sig) = @_; my $workers = $self->{-wq_workers} or return; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 6be6d10b..2cb2bf40 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -281,11 +281,14 @@ sub fail ($$;$) { sub atfork_prepare_wq { my ($self, $wq) = @_; - my $tcafc = $wq->{-ipc_atfork_child_close}; + my $tcafc = $wq->{-ipc_atfork_child_close} //= []; push @$tcafc, @TO_CLOSE_ATFORK_CHILD; if (my $sock = $self->{sock}) { push @$tcafc, @$self{qw(0 1 2)}, $sock; } + if (my $pgr = $self->{pgr}) { + push @$tcafc, @$pgr[1,2]; + } for my $f (qw(lxs l2m)) { my $ipc = $self->{$f} or next; push @$tcafc, grep { defined } @@ -335,9 +338,7 @@ sub atfork_parent_wq { my $l2m = $ret->{l2m}; if ($l2m && $l2m != $wq) { # $wq == lxs $io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1}; - if (my @pids = $l2m->wq_close) { - $wq->{l2m_pids} = \@pids; - } + $l2m->wq_close(1); } ($ret, @io); } diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index 941bc299..7d634b5e 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -32,24 +32,25 @@ sub lei_q { my $sto = $self->_lei_store(1); push @srcs, $sto->search; } - my $lxs = PublicInbox::LeiXSearch->new; + my $lxs = $self->{lxs} = PublicInbox::LeiXSearch->new; # --external is enabled by default, but allow --no-external - if ($opt->{external} // 1) { + if ($opt->{external} //= 1) { $self->_externals_each(\&_vivify_external, \@srcs); } - my $j = $opt->{jobs} // (scalar(@srcs) > 3 ? 3 : scalar(@srcs)); - $j = 1 if !$opt->{thread}; - $self->atfork_prepare_wq($lxs); - $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset); - $self->{lxs} = $lxs; - + my $xj = $opt->{jobs} // (scalar(@srcs) > 3 ? 3 : scalar(@srcs)); + $xj = 1 if !$opt->{thread}; my $ovv = PublicInbox::LeiOverview->new($self) or return; + $self->atfork_prepare_wq($lxs); + $lxs->wq_workers_start('lei_xsearch', $xj, $self->oldset); + delete $lxs->{-ipc_atfork_child_close}; if (my $l2m = $self->{l2m}) { - $j = 4 if $j <= 4; # TODO configurable + my $mj = 4; # TODO: configurable $self->atfork_prepare_wq($l2m); - $l2m->wq_workers_start('lei2mail', $j, $self->oldset); + $l2m->wq_workers_start('lei2mail', $mj, $self->oldset); + delete $l2m->{-ipc_atfork_child_close}; } + # no forking workers after this my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset); diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 3dcce9e7..87cc9c47 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -467,7 +467,7 @@ sub write_mail { # via ->wq_do sub ipc_atfork_prepare { my ($self) = @_; - # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr) + # (done_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr) $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= >&=]); $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 13611882..7b33677e 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -110,6 +110,7 @@ sub wait_startq ($) { sub query_thread_mset { # for --thread my ($self, $lei, $ibxish) = @_; + local $0 = "$0 query_thread_mset"; my $startq = delete $self->{5}; my %sig = $lei->atfork_child_wq($self); local @SIG{keys %sig} = values %sig; @@ -148,6 +149,7 @@ sub query_thread_mset { # for --thread sub query_mset { # non-parallel for non-"--thread" users my ($self, $lei, $srcs) = @_; + local $0 = "$0 query_mset"; my $startq = delete $self->{5}; my %sig = $lei->atfork_child_wq($self); local @SIG{keys %sig} = values %sig; @@ -192,12 +194,10 @@ sub git { sub query_done { # EOF callback my ($self, $lei) = @_; my $l2m = delete $lei->{l2m}; - if (my $pids = delete $self->{l2m_pids}) { - my $ipc_worker_reap = $self->can('ipc_worker_reap'); - dwaitpid($_, $ipc_worker_reap, $l2m) for @$pids; - } + $l2m->wq_wait_old if $l2m; + $self->wq_wait_old; $lei->{ovv}->ovv_end($lei); - if ($l2m) { # calls LeiToMail reap_compress + if ($l2m) { # close() calls LeiToMail reap_compress close(delete($lei->{1})) if $lei->{1}; $lei->start_mua; } @@ -232,12 +232,12 @@ sub start_query { # always runs in main (lei-daemon) process for my $rmt (@$remotes) { $self->wq_do('query_thread_mbox', $io, $lei, $rmt); } - close $io->[0]; # qry_status_wr @$io = (); } sub query_prepare { # called by wq_do my ($self, $lei) = @_; + local $0 = "$0 query_prepare"; my %sig = $lei->atfork_child_wq($self); -p $lei->{0} or die "BUG: \$done pipe expected"; local @SIG{keys %sig} = values %sig; @@ -246,11 +246,11 @@ sub query_prepare { # called by wq_do syswrite($lei->{0}, '.') == 1 or die "do_post_augment trigger: $!"; } -sub sigpipe_handler { - my ($self, $lei_orig, $pids) = @_; - if ($pids) { # one-shot (no event loop) - kill 'TERM', @$pids; +sub sigpipe_handler { # handles SIGPIPE from wq workers + my ($self, $lei_orig) = @_; + if ($self->wq_kill_old) { kill 'PIPE', $$; + $self->wq_wait_old; } else { $self->wq_kill; $self->wq_close; @@ -287,19 +287,16 @@ sub do_query { $io[1] = $zpipe->[1] if $zpipe; } start_query($self, \@io, $lei, $srcs); + $self->wq_close(1); unless ($in_loop) { - my @pids = $self->wq_close; # for the $lei->atfork_child_wq PIPE handler: - $done_op->{'!'}->[3] = \@pids; while ($done->{sock}) { $done->event_step } - my $ipc_worker_reap = $self->can('ipc_worker_reap'); - dwaitpid($_, $ipc_worker_reap, $self) for @pids; } } sub ipc_atfork_prepare { my ($self) = @_; - # (0: qry_status_wr, 1: stdout|mbox, 2: stderr, + # (0: done_wr, 1: stdout|mbox, 2: stderr, # 3: sock, 4: $l2m->{-wq_s1}, 5: $startq) $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&= <&=]); $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
Worker exit causes DESTROY ordering to become unpredictable and leads to Perl segfaulting. Instead, rely on OnDestroy and explicit triggering after wq_worker_loop to ensure we finish all outstanding git requests before worker exit. --- lib/PublicInbox/LeiToMail.pm | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 87cc9c47..cea68319 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -11,6 +11,7 @@ use PublicInbox::Lock; use PublicInbox::ProcessPipe; use PublicInbox::Spawn qw(which spawn popen_rd); use PublicInbox::LeiDedupe; +use PublicInbox::OnDestroy; use Symbol qw(gensym); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); @@ -472,12 +473,21 @@ sub ipc_atfork_prepare { $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC } -sub DESTROY { +# We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY +# ordering is unstable at worker exit and may cause segfaults +sub reap_gits { my ($self) = @_; - for my $pid_git (grep(/\A$$\0/, keys %$self)) { - $self->{$pid_git}->async_wait_all; + for my $git (delete @$self{grep(/\A$$\0/, keys %$self)}) { + $git->async_wait_all; } - $self->SUPER::DESTROY; # PublicInbox::IPC +} + +sub ipc_atfork_child { # runs after IPC::wq_worker_loop + my ($self) = @_; + $self->SUPER::ipc_atfork_child; + # reap_gits needs to run before $self->DESTROY, + # IPC.pm will ensure that. + PublicInbox::OnDestroy->new($$, \&reap_gits, $self); } 1;
STDERR may actually get closed in ->ipc_atfork_child in oneshot mode, so ensure we pass in a valid file handle to avoid warnings ->wq_do. --- lib/PublicInbox/LEI.pm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 2cb2bf40..11ea385f 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -334,7 +334,7 @@ sub atfork_parent_wq { $self->{env} = $env; delete @$ret{qw(-lei_store cfg pgr lxs)}; # keep l2m my @io = delete @$ret{0..2}; - $io[3] = delete($ret->{sock}) // *STDERR{GLOB}; + $io[3] = delete($ret->{sock}) // $io[2]; my $l2m = $ret->{l2m}; if ($l2m && $l2m != $wq) { # $wq == lxs $io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1};
The signal handlers on the client side were unnecessary, all we need is to handle socket EOF properly in the daemon by killing xsearch and l2m workers. --- lib/PublicInbox/IPC.pm | 1 + lib/PublicInbox/LEI.pm | 9 ++++++++- script/lei | 5 ----- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 24f45e03..dbb87e4e 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -408,6 +408,7 @@ sub DESTROY { my $ppid = $self->{-wq_ppid}; wq_kill($self) if $ppid && $ppid == $$; wq_close($self); + wq_wait_old($self); ipc_worker_stop($self); } diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 11ea385f..ccfc1649 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -767,7 +767,14 @@ sub accept_dispatch { # Listener {post_accept} callback sub dclose { my ($self) = @_; - delete $self->{lxs}; # stops LeiXSearch queries + for my $f (qw(lxs l2m)) { + my $wq = delete $self->{$f} or next; + if ($wq->wq_kill) { + $self->wq_close + } elsif ($wq->wq_kill_old) { + $wq->wq_wait_old; + } + } close(delete $self->{1}) if $self->{1}; # may reap_compress $self->close if $self->{sock}; # PublicInbox::DS::close } diff --git a/script/lei b/script/lei index a4a0217b..8dcea562 100755 --- a/script/lei +++ b/script/lei @@ -81,11 +81,6 @@ Falling back to (slow) one-shot mode while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" } $buf .= "\0\0"; $send_cmd->($sock, [ 0, 1, 2, fileno($dh) ], $buf, MSG_EOR); - $SIG{TERM} = $SIG{INT} = $SIG{QUIT} = sub { - my ($sig) = @_; # 'TERM', not an integer :< - $SIG{$sig} = 'DEFAULT'; - kill($sig, $$); # exit($signo + 128) - }; my $x_it_code = 0; while (1) { my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33);
Having an extra reference to LeiXSearch from the OpPipe $done_op map is unnecessary and makes the reference graph more complex than it needs to be. Just use $lei->{lxs} to simplify and reduce the likelyhood of bugs. --- lib/PublicInbox/LeiXSearch.pm | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 7b33677e..987a9896 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -192,12 +192,14 @@ sub git { } sub query_done { # EOF callback - my ($self, $lei) = @_; - my $l2m = delete $lei->{l2m}; - $l2m->wq_wait_old if $l2m; - $self->wq_wait_old; + my ($lei) = @_; + my $has_l2m = exists $lei->{l2m}; + for my $f (qw(lxs l2m)) { + my $wq = delete $lei->{$f} or next; + $wq->wq_wait_old; + } $lei->{ovv}->ovv_end($lei); - if ($l2m) { # close() calls LeiToMail reap_compress + if ($has_l2m) { # close() calls LeiToMail reap_compress close(delete($lei->{1})) if $lei->{1}; $lei->start_mua; } @@ -246,16 +248,14 @@ sub query_prepare { # called by wq_do syswrite($lei->{0}, '.') == 1 or die "do_post_augment trigger: $!"; } -sub sigpipe_handler { # handles SIGPIPE from wq workers - my ($self, $lei_orig) = @_; - if ($self->wq_kill_old) { +sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers + my ($lei) = @_; + my $lxs = delete $lei->{lxs}; + if ($lxs && $lxs->wq_kill_old) { kill 'PIPE', $$; - $self->wq_wait_old; - } else { - $self->wq_kill; - $self->wq_close; + $lxs->wq_wait_old; } - close(delete $lei_orig->{1}) if $lei_orig->{1}; + close(delete $lei->{1}) if $lei->{1}; } sub do_query { @@ -266,8 +266,8 @@ sub do_query { $lei_orig->event_step_init; # wait for shutdowns my $done_op = { - '' => [ \&query_done, $self, $lei_orig ], - '!' => [ \&sigpipe_handler, $self, $lei_orig ] + '' => [ \&query_done, $lei_orig ], + '!' => [ \&sigpipe_handler, $lei_orig ] }; my $in_loop = exists $lei_orig->{sock}; $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
..At least limit it to a single file handle. The write end EOFpipe can be limited in scope and auto-closed when $quit is clobbered, leaving only the listener. The listener is the only handle that needs to be closed explicitly due to it being on the stack in the Listener->event_step => accept_dispatch => lei_$FOO code path. Everything else gets clobbered by DS->Reset in children after forking. --- lib/PublicInbox/LEI.pm | 40 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index ccfc1649..37b45a00 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -27,7 +27,7 @@ use Text::Wrap qw(wrap); use File::Path qw(mkpath); use File::Spec; our $quit = \&CORE::exit; -our ($current_lei, $errors_log); +our ($current_lei, $errors_log, $listener); my ($recv_cmd, $send_cmd); my $GLP = Getopt::Long::Parser->new; $GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev)); @@ -35,7 +35,6 @@ my $GLP_PASS = Getopt::Long::Parser->new; $GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through)); our %PATH2CFG; # persistent for socket daemon -our @TO_CLOSE_ATFORK_CHILD; # TBD: this is a documentation mechanism to show a subcommand # (may) pass options through to another command: @@ -281,8 +280,7 @@ sub fail ($$;$) { sub atfork_prepare_wq { my ($self, $wq) = @_; - my $tcafc = $wq->{-ipc_atfork_child_close} //= []; - push @$tcafc, @TO_CLOSE_ATFORK_CHILD; + my $tcafc = $wq->{-ipc_atfork_child_close} //= [ $listener // () ]; if (my $sock = $self->{sock}) { push @$tcafc, @$self{qw(0 1 2)}, $sock; } @@ -307,7 +305,6 @@ sub atfork_child_wq { %PATH2CFG = (); undef $errors_log; $quit = \&CORE::exit; - @TO_CLOSE_ATFORK_CHILD = (); (__WARN__ => sub { err($self, @_) }, PIPE => sub { $self->x_it(13); # SIGPIPE = 13 @@ -837,12 +834,12 @@ sub lazy_start { die "connect($path): $!"; } umask(077) // die("umask(077): $!"); - socket(my $l, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!"; - bind($l, pack_sockaddr_un($path)) or die "bind($path): $!"; - listen($l, 1024) or die "listen: $!"; + local $listener; + socket($listener, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!"; + bind($listener, pack_sockaddr_un($path)) or die "bind($path): $!"; + listen($listener, 1024) or die "listen: $!"; my @st = stat($path) or die "stat($path): $!"; my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino - pipe(my ($eof_r, $eof_w)) or die "pipe: $!"; local $oldset = PublicInbox::DS::block_signals(); if ($narg == 5) { $send_cmd = PublicInbox::Spawn->can('send_cmd4'); @@ -869,20 +866,21 @@ sub lazy_start { return if $pid; $0 = "lei-daemon $path"; local %PATH2CFG; - local @TO_CLOSE_ATFORK_CHILD = ($l, $eof_w); - $l->blocking(0); - $l = PublicInbox::Listener->new($l, \&accept_dispatch, $l); + $listener->blocking(0); my $exit_code; - local $quit = sub { - $exit_code //= shift; - my $listener = $l or exit($exit_code); - # closing eof_w triggers \&noop wakeup - $eof_w = $l = $path = undef; - $listener->close; # DS::close - PublicInbox::DS->SetLoopTimeout(1000); + my $pil = PublicInbox::Listener->new($listener, \&accept_dispatch); + local $quit = do { + pipe(my ($eof_r, $eof_w)) or die "pipe: $!"; + PublicInbox::EOFpipe->new($eof_r, \&noop, undef); + sub { + $exit_code //= shift; + my $lis = $pil or exit($exit_code); + # closing eof_w triggers \&noop wakeup + $listener = $eof_w = $pil = $path = undef; + $lis->close; # DS::close + PublicInbox::DS->SetLoopTimeout(1000); + }; }; - PublicInbox::EOFpipe->new($eof_r, \&noop, undef); - undef $eof_r; my $sig = { CHLD => \&PublicInbox::DS::enqueue_reap, QUIT => $quit,
For proper matching, we'll do a better job canonicalizing URLs and path names for matching. Of course, users may edit the file outside of lei, so ensure we try both the canonicalized and as-is form provided by the user. I also don't think we'll need to store externals info in MiscIdx; just the config file is fine. --- MANIFEST | 1 + lib/PublicInbox/LEI.pm | 24 ++++++++++----- lib/PublicInbox/LeiExternal.pm | 54 +++++++++++++++++++++++++++------- t/lei.t | 9 ++++++ t/lei_external.t | 18 ++++++++++++ 5 files changed, 88 insertions(+), 18 deletions(-) create mode 100644 t/lei_external.t diff --git a/MANIFEST b/MANIFEST index 0de1de4a..ddee1539 100644 --- a/MANIFEST +++ b/MANIFEST @@ -339,6 +339,7 @@ t/kqnotify.t t/lei-oneshot.t t/lei.t t/lei_dedupe.t +t/lei_external.t t/lei_overview.t t/lei_store.t t/lei_to_mail.t diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 37b45a00..9c3d7279 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -21,7 +21,7 @@ use PublicInbox::Config; use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET); use PublicInbox::Sigfd; use PublicInbox::DS qw(now dwaitpid); -use PublicInbox::Spawn qw(spawn run_die popen_rd); +use PublicInbox::Spawn qw(spawn popen_rd); use PublicInbox::OnDestroy; use Text::Wrap qw(wrap); use File::Path qw(mkpath); @@ -95,7 +95,7 @@ our %CMD = ( # sorted in order of importance/use: qw(boost=i quiet|q) ], 'ls-external' => [ '[FILTER...]', 'list publicinbox|extindex locations', qw(format|f=s z|0 local remote quiet|q) ], -'forget-external' => [ '{URL_OR_PATHNAME|--prune}', +'forget-external' => [ 'URL_OR_PATHNAME...|--prune', 'exclude further results from a publicinbox|extindex', qw(prune quiet|q) ], @@ -114,7 +114,7 @@ our %CMD = ( # sorted in order of importance/use: "exclude message(s) on stdin from `q' search results", qw(stdin| oid=s exact by-mid|mid:s quiet|q) ], -'purge-mailsource' => [ '{URL_OR_PATHNAME|--all}', +'purge-mailsource' => [ 'URL_OR_PATHNAME|--all', 'remove imported messages from IMAP, Maildirs, and MH', qw(exact! all jobs:i indexed) ], @@ -137,7 +137,7 @@ our %CMD = ( # sorted in order of importance/use: 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch', qw(prune) ], -'import' => [ '{URL_OR_PATHNAME|--stdin}', +'import' => [ 'URL_OR_PATHNAME|--stdin', 'one-shot import/update from URL or filesystem', qw(stdin| offset=i recursive|r exclude=s include=s !flags), ], @@ -468,6 +468,7 @@ sub optparse ($$$) { last; } # else continue looping } + last if $ok; my $last = pop @or; $err = join(', ', @or) . " or $last must be set"; } else { @@ -547,16 +548,23 @@ sub lei_mark { my ($self, @argv) = @_; } -sub lei_config { +sub _config { my ($self, @argv) = @_; - $self->{opt}->{'config-file'} and return fail $self, - "config file switches not supported by `lei config'"; my $env = $self->{env}; delete local $env->{GIT_CONFIG}; + delete local $ENV{GIT_CONFIG}; my $cfg = _lei_cfg($self, 1); my $cmd = [ qw(git config -f), $cfg->{'-f'}, @argv ]; my %rdr = map { $_ => $self->{$_} } (0..2); - run_die($cmd, $env, \%rdr); + waitpid(spawn($cmd, $env, \%rdr), 0); +} + +sub lei_config { + my ($self, @argv) = @_; + $self->{opt}->{'config-file'} and return fail $self, + "config file switches not supported by `lei config'"; + _config(@_); + x_it($self, $?) if $?; } sub lei_init { diff --git a/lib/PublicInbox/LeiExternal.pm b/lib/PublicInbox/LeiExternal.pm index 64faf5a0..21071058 100644 --- a/lib/PublicInbox/LeiExternal.pm +++ b/lib/PublicInbox/LeiExternal.pm @@ -7,6 +7,7 @@ use strict; use v5.10.1; use parent qw(Exporter); our @EXPORT = qw(lei_ls_external lei_add_external lei_forget_external); +use PublicInbox::Config; sub _externals_each { my ($self, $cb, @arg) = @_; @@ -30,7 +31,6 @@ sub _externals_each { sub lei_ls_external { my ($self, @argv) = @_; - my $stor = $self->_lei_store(0); my $out = $self->{1}; my ($OFS, $ORS) = $self->{opt}->{z} ? ("\0", "\0\0") : (" ", "\n"); $self->_externals_each(sub { @@ -39,24 +39,58 @@ sub lei_ls_external { }); } +sub _canonicalize { + my ($location) = @_; + if ($location !~ m!\Ahttps?://!) { + PublicInbox::Config::rel2abs_collapsed($location); + } else { + require URI; + my $uri = URI->new($location)->canonical; + my $path = $uri->path . '/'; + $path =~ tr!/!/!s; # squeeze redundant '/' + $uri->path($path); + $uri->as_string; + } +} + sub lei_add_external { - my ($self, $url_or_dir) = @_; + my ($self, $location) = @_; my $cfg = $self->_lei_cfg(1); - if ($url_or_dir !~ m!\Ahttps?://!) { - $url_or_dir = File::Spec->canonpath($url_or_dir); - } my $new_boost = $self->{opt}->{boost} // 0; - my $key = "external.$url_or_dir.boost"; + $location = _canonicalize($location); + my $key = "external.$location.boost"; my $cur_boost = $cfg->{$key}; return if defined($cur_boost) && $cur_boost == $new_boost; # idempotent $self->lei_config($key, $new_boost); - my $stor = $self->_lei_store(1); - # TODO: add to MiscIdx - $stor->done; + $self->_lei_store(1)->done; # just create the store } sub lei_forget_external { - # TODO + my ($self, @locations) = @_; + my $cfg = $self->_lei_cfg(1); + my $quiet = $self->{opt}->{quiet}; + for my $loc (@locations) { + my (@unset, @not_found); + for my $l ($loc, _canonicalize($loc)) { + my $key = "external.$l.boost"; + delete($cfg->{$key}); + $self->_config('--unset', $key); + if ($? == 0) { + push @unset, $key; + } elsif (($? >> 8) == 5) { + push @not_found, $key; + } else { + $self->err("# --unset $key error"); + return $self->x_it($?); + } + } + if (@unset) { + next if $quiet; + $self->err("# $_ unset") for @unset; + } elsif (@not_found) { + $self->err("# $_ not found") for @not_found; + } # else { already exited + } } 1; diff --git a/t/lei.t b/t/lei.t index ef820fe3..50ad2bb1 100644 --- a/t/lei.t +++ b/t/lei.t @@ -180,6 +180,15 @@ my $test_external = sub { }); $lei->('ls-external'); like($out, qr/boost=0\n/s, 'ls-external has output'); + ok($lei->(qw(add-external -q https://EXAMPLE.com/ibx)), 'add remote'); + is($err, '', 'no warnings after add-external'); + $lei->('ls-external'); + like($out, qr!https://example\.com/ibx/!s, 'added canonical URL'); + is($err, '', 'no warnings on ls-external'); + ok($lei->(qw(forget-external -q https://EXAMPLE.com/ibx)), + 'forget'); + $lei->('ls-external'); + unlike($out, qr!https://example\.com/ibx/!s, 'removed canonical URL'); ok(!$lei->(qw(q s:prefix -o /dev/null -f maildir)), 'bad maildir'); like($err, qr!/dev/null exists and is not a directory!, diff --git a/t/lei_external.t b/t/lei_external.t new file mode 100644 index 00000000..1f0048a1 --- /dev/null +++ b/t/lei_external.t @@ -0,0 +1,18 @@ +#!perl -w +use strict; +use v5.10.1; +use Test::More; +my $cls = 'PublicInbox::LeiExternal'; +require_ok $cls; +my $canon = $cls->can('_canonicalize'); +my $exp = 'https://example.com/my-inbox/'; +is($canon->('https://example.com/my-inbox'), $exp, 'trailing slash added'); +is($canon->('https://example.com/my-inbox//'), $exp, 'trailing slash removed'); +is($canon->('https://example.com//my-inbox/'), $exp, 'leading slash removed'); +is($canon->('https://EXAMPLE.com/my-inbox/'), $exp, 'lowercased'); +is($canon->('/this/path/is/nonexistent/'), '/this/path/is/nonexistent', + 'non-existent pathname canonicalized'); +is($canon->('/this//path/'), '/this/path', 'extra slashes gone'); +is($canon->('/ALL/CAPS'), '/ALL/CAPS', 'caps preserved'); + +done_testing;
The tricky bit was getting around word splitting bash does on URLs. This may work with other shells, too. --- lib/PublicInbox/LEI.pm | 4 ++++ lib/PublicInbox/LeiExternal.pm | 17 +++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 9c3d7279..ef3f90fc 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -655,6 +655,10 @@ sub lei__complete { } elsif ($cmd eq 'config' && !@argv && !$CONFIG_KEYS{$cur}) { puts $self, grep(/$re/, keys %CONFIG_KEYS); } + $cmd =~ tr/-/_/; + if (my $sub = $self->can("_complete_$cmd")) { + puts $self, $sub->($self, @argv, $cur); + } # TODO: URLs, pathnames, OIDs, MIDs, etc... See optparse() for # proto parsing. } diff --git a/lib/PublicInbox/LeiExternal.pm b/lib/PublicInbox/LeiExternal.pm index 21071058..59c3c367 100644 --- a/lib/PublicInbox/LeiExternal.pm +++ b/lib/PublicInbox/LeiExternal.pm @@ -93,4 +93,21 @@ sub lei_forget_external { } } +# shell completion helper called by lei__complete +sub _complete_forget_external { + my ($self, @argv) = @_; + my $cfg = $self->_lei_cfg(0); + my $cur = pop @argv; + # Workaround bash word-splitting URLs to ['https', ':', '//' ...] + # Maybe there's a better way to go about this in + # contrib/completion/lei-completion.bash + my $colon = ($argv[-1] // '') eq ':'; + my $re = $cur =~ /\A[\w-]/ ? '' : '.*'; + map { + my $x = substr($_, length('external.')); + # only return the part specified on the CLI + $colon && $x =~ /(\Q$cur\E.*)/ ? $1 : $x; + } grep(/\Aexternal\.$re\Q$cur/, @{$cfg->{-section_order}}); +} + 1;