* [PATCH 0/7] "lei q -o imaps://..." support
@ 2021-02-21 7:41 Eric Wong
2021-02-21 7:41 ` [PATCH 1/7] inbox_writable: require PublicInbox::MdirReader Eric Wong
` (6 more replies)
0 siblings, 7 replies; 8+ messages in thread
From: Eric Wong @ 2021-02-21 7:41 UTC (permalink / raw)
To: meta
-a/--augment dedupe is now parallel for both Maildirs and IMAP
stores (probably not worth the serialization cost for mbox*).
LeiAuth remains inefficient, unfortunately; but wq_broadcast
has been added to address it in the future.
The parallelization work for IMAP for "lei q" can also be done
for "lei convert" and "lei import", but it'll probably be opt-in
in case people care about preserving UID order.
Eric Wong (7):
inbox_writable: require PublicInbox::MdirReader
lei q: support IMAP/IMAPS --output destinations
ipc: add wq_broadcast
lei q: move augment into lei2mail workers
ipc: support setting a locked number of WQ workers
net_reader: use and accept URIimap objects in more places
lei2mail: parallel augment for lock-free stores
lib/PublicInbox/IPC.pm | 35 +++++++--
lib/PublicInbox/InboxWritable.pm | 1 +
lib/PublicInbox/LeiAuth.pm | 2 +-
lib/PublicInbox/LeiOverview.pm | 7 +-
lib/PublicInbox/LeiQuery.pm | 24 +++++--
lib/PublicInbox/LeiToMail.pm | 93 ++++++++++++++++++++++--
lib/PublicInbox/LeiXSearch.pm | 48 ++++++-------
lib/PublicInbox/NetReader.pm | 75 +++++++++++---------
lib/PublicInbox/NetWriter.pm | 12 ++++
lib/PublicInbox/WQWorker.pm | 8 +--
lib/PublicInbox/Watch.pm | 11 +--
t/ipc.t | 39 +++++-----
t/lei-externals.t | 3 +-
xt/net_writer-imap.t | 118 ++++++++++++++++++++++++++++---
14 files changed, 362 insertions(+), 114 deletions(-)
^ permalink raw reply [flat|nested] 8+ messages in thread
* [PATCH 1/7] inbox_writable: require PublicInbox::MdirReader
2021-02-21 7:41 [PATCH 0/7] "lei q -o imaps://..." support Eric Wong
@ 2021-02-21 7:41 ` Eric Wong
2021-02-21 7:41 ` [PATCH 2/7] lei q: support IMAP/IMAPS --output destinations Eric Wong
` (5 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2021-02-21 7:41 UTC (permalink / raw)
To: meta
This wasn't causing known failures, but maybe it was or will in
the future.
---
lib/PublicInbox/InboxWritable.pm | 1 +
1 file changed, 1 insertion(+)
diff --git a/lib/PublicInbox/InboxWritable.pm b/lib/PublicInbox/InboxWritable.pm
index d4a9040f..c2baeba6 100644
--- a/lib/PublicInbox/InboxWritable.pm
+++ b/lib/PublicInbox/InboxWritable.pm
@@ -153,6 +153,7 @@ sub import_maildir {
}
my $im = $self->importer(1);
my @self = $self->filter($im) ? ($self) : ();
+ require PublicInbox::MdirReader;
PublicInbox::MdirReader::maildir_each_file(\&_each_maildir_fn,
$im, @self);
$im->done;
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 2/7] lei q: support IMAP/IMAPS --output destinations
2021-02-21 7:41 [PATCH 0/7] "lei q -o imaps://..." support Eric Wong
2021-02-21 7:41 ` [PATCH 1/7] inbox_writable: require PublicInbox::MdirReader Eric Wong
@ 2021-02-21 7:41 ` Eric Wong
2021-02-21 7:41 ` [PATCH 3/7] ipc: add wq_broadcast Eric Wong
` (4 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2021-02-21 7:41 UTC (permalink / raw)
To: meta
Augment (and dedupe) aren't parallel, yet, so its more sensitive to
high-latency networks.
---
lib/PublicInbox/LeiAuth.pm | 2 +-
lib/PublicInbox/LeiOverview.pm | 7 +-
lib/PublicInbox/LeiQuery.pm | 18 ++++-
lib/PublicInbox/LeiToMail.pm | 56 +++++++++++++++-
lib/PublicInbox/NetReader.pm | 7 +-
lib/PublicInbox/NetWriter.pm | 12 ++++
xt/net_writer-imap.t | 118 ++++++++++++++++++++++++++++++---
7 files changed, 202 insertions(+), 18 deletions(-)
diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
index 7acb9900..bf0110ed 100644
--- a/lib/PublicInbox/LeiAuth.pm
+++ b/lib/PublicInbox/LeiAuth.pm
@@ -63,7 +63,7 @@ sub ipc_atfork_child {
}
sub new {
- my ($cls, $nrd) = @_;
+ my ($cls, $nrd) = @_; # nrd may be NetReader or descendant (NetWriter)
bless { nrd => $nrd }, $cls;
}
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 3169bae6..4db1d8c8 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -59,7 +59,12 @@ sub new {
my $fmt = $opt->{$ofmt_key};
$fmt = lc($fmt) if defined $fmt;
- if ($dst =~ s/\A([a-z0-9]+)://is) { # e.g. Maildir:/home/user/Mail/
+ if ($dst =~ m!\A([a-z0-9\+]+)://!is) {
+ defined($fmt) and return $lei->fail(<<"");
+--$ofmt_key=$fmt invalid with URL $dst
+
+ $fmt = lc $1;
+ } elsif ($dst =~ s/\A([a-z0-9]+)://is) { # e.g. Maildir:/home/user/Mail/
my $ofmt = lc $1;
$fmt //= $ofmt;
return $lei->fail(<<"") if $fmt ne $ofmt;
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index f71beae6..eaf91f2e 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -11,14 +11,26 @@ sub prep_ext { # externals_each callback
$lxs->prepare_external($loc) unless $exclude->{$loc};
}
-sub qstr_add { # for --stdin
+sub _start_query {
+ my ($self) = @_;
+ if (my $nwr = $self->{nwr}) {
+ require PublicInbox::LeiAuth;
+ my $auth = $self->{auth} = PublicInbox::LeiAuth->new($nwr);
+ my $lxs = $self->{lxs};
+ $auth->auth_start($self, $lxs->can('do_query'), $lxs, $self);
+ } else {
+ $self->{lxs}->do_query($self);
+ }
+}
+
+sub qstr_add { # PublicInbox::InputPipe::consume callback for --stdin
my ($self) = @_; # $_[1] = $rbuf
if (defined($_[1])) {
$_[1] eq '' and return eval {
my $lse = delete $self->{lse};
$lse->query_approxidate($lse->git,
$self->{mset_opt}->{qstr});
- $self->{lxs}->do_query($self);
+ _start_query($self);
};
$self->{mset_opt}->{qstr} .= $_[1];
} else {
@@ -115,7 +127,7 @@ no query allowed on command-line with --stdin
return;
}
$mset_opt{qstr} = $lse->query_argv_to_string($lse->git, \@argv);
- $lxs->do_query($self);
+ _start_query($self);
}
# shell completion helper called by lei__complete
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index e89cca71..0e0b0a43 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -331,9 +331,31 @@ sub _maildir_write_cb ($$) {
}
}
+sub _imap_write_cb ($$) {
+ my ($self, $lei) = @_;
+ my $dedupe = $lei->{dedupe};
+ $dedupe->prepare_dedupe if $dedupe;
+ my $imap_append = $lei->{nwr}->can('imap_append');
+ my $mic = $lei->{nwr}->mic_get($lei->{ovv}->{dst});
+ my $folder = $self->{uri}->mailbox;
+ sub { # for git_to_mail
+ my ($bref, $smsg, $eml) = @_;
+ $mic // return $lei->fail; # dst may be undef-ed in last run
+ if ($dedupe) {
+ $eml //= PublicInbox::Eml->new($$bref); # copy bref
+ return if $dedupe->is_dup($eml, $smsg->{blob});
+ }
+ eval { $imap_append->($mic, $folder, $bref, $smsg, $eml) };
+ if (my $err = $@) {
+ undef $mic;
+ die $err;
+ }
+ }
+}
+
sub write_cb { # returns a callback for git_to_mail
my ($self, $lei) = @_;
- # _mbox_write_cb or _maildir_write_cb
+ # _mbox_write_cb, _maildir_write_cb or _imap_write_cb
my $m = "_$self->{base_type}_write_cb";
$self->$m($lei);
}
@@ -360,6 +382,18 @@ sub new {
"$dst exists and is not a writable file\n";
$self->can("eml2$fmt") or die "bad mbox format: $fmt\n";
$self->{base_type} = 'mbox';
+ } elsif ($fmt =~ /\Aimaps?\z/) { # TODO .onion support
+ require PublicInbox::NetWriter;
+ my $nwr = PublicInbox::NetWriter->new;
+ $nwr->add_url($dst);
+ $nwr->{quiet} = $lei->{opt}->{quiet};
+ my $err = $nwr->errors($dst);
+ return $lei->fail($err) if $err;
+ require PublicInbox::URIimap; # TODO: URI cast early
+ $self->{uri} = PublicInbox::URIimap->new($dst);
+ $self->{uri}->mailbox or die "No mailbox: $dst";
+ $lei->{nwr} = $nwr;
+ $self->{base_type} = 'imap';
} else {
die "bad mail --format=$fmt\n";
}
@@ -394,6 +428,26 @@ sub _do_augment_maildir {
}
}
+sub _augment_imap { # PublicInbox::NetReader::imap_each cb
+ my ($url, $uid, $kw, $eml, $lei) = @_;
+ _augment($eml, $lei);
+}
+
+sub _do_augment_imap {
+ my ($self, $lei) = @_;
+ my $dst = $lei->{ovv}->{dst};
+ my $nwr = $lei->{nwr};
+ if ($lei->{opt}->{augment}) {
+ my $dedupe = $lei->{dedupe};
+ if ($dedupe && $dedupe->prepare_dedupe) {
+ $nwr->imap_each($dst, \&_augment_imap, $lei);
+ $dedupe->pause_dedupe;
+ }
+ } else { # clobber existing IMAP folder
+ $nwr->imap_delete_all($dst);
+ }
+}
+
sub _pre_augment_mbox {
my ($self, $lei) = @_;
my $dst = $lei->{ovv}->{dst};
diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm
index 92d004bc..541094a0 100644
--- a/lib/PublicInbox/NetReader.pm
+++ b/lib/PublicInbox/NetReader.pm
@@ -422,8 +422,13 @@ sub _imap_fetch_all ($$$) {
# uses cached auth info prepared by mic_for
sub mic_get {
my ($self, $sec) = @_;
- my $mic_arg = $self->{mic_arg}->{$sec} or
+ my $mic_arg = $self->{mic_arg}->{$sec};
+ unless ($mic_arg) {
+ my $uri = PublicInbox::URIimap->new($sec);
+ $sec = uri_section($uri);
+ $mic_arg = $self->{mic_arg}->{$sec} or
die "BUG: no Mail::IMAPClient->new arg for $sec";
+ }
if (defined(my $cb_name = $mic_arg->{Authcallback})) {
if (ref($cb_name) ne 'CODE') {
$mic_arg->{Authcallback} = $self->can($cb_name);
diff --git a/lib/PublicInbox/NetWriter.pm b/lib/PublicInbox/NetWriter.pm
index 6f0a0b94..89f8662e 100644
--- a/lib/PublicInbox/NetWriter.pm
+++ b/lib/PublicInbox/NetWriter.pm
@@ -23,4 +23,16 @@ sub imap_append {
die "APPEND $folder: $@";
}
+sub imap_delete_all {
+ my ($self, $url) = @_;
+ my $uri = PublicInbox::URIimap->new($url);
+ my $sec = $self->can('uri_section')->($uri);
+ local $0 = $uri->mailbox." $sec";
+ my $mic = $self->mic_get($sec) or die "E: not connected: $@";
+ $mic->select($uri->mailbox) or return; # non-existent
+ if ($mic->delete_message('1:*')) {
+ $mic->expunge;
+ }
+}
+
1;
diff --git a/xt/net_writer-imap.t b/xt/net_writer-imap.t
index dfd765be..4832245a 100644
--- a/xt/net_writer-imap.t
+++ b/xt/net_writer-imap.t
@@ -7,6 +7,7 @@ use POSIX qw(strftime);
use PublicInbox::OnDestroy;
use PublicInbox::URIimap;
use PublicInbox::Config;
+use Fcntl qw(O_EXCL O_WRONLY O_CREAT);
my $imap_url = $ENV{TEST_IMAP_WRITE_URL} or
plan skip_all => 'TEST_IMAP_WRITE_URL unset';
my $uri = PublicInbox::URIimap->new($imap_url);
@@ -19,30 +20,125 @@ my ($base) = ($0 =~ m!\b([^/]+)\.[^\.]+\z!);
my $folder = "INBOX.$base-$host-".strftime('%Y%m%d%H%M%S', gmtime(time)).
"-$$-".sprintf('%x', int(rand(0xffffffff)));
my $nwr = PublicInbox::NetWriter->new;
-$imap_url .= '/' unless substr($imap_url, -1) eq '/';
+chop($imap_url) if substr($imap_url, -1) eq '/';
my $folder_uri = PublicInbox::URIimap->new("$imap_url/$folder");
is($folder_uri->mailbox, $folder, 'folder correct') or
BAIL_OUT "BUG: bad $$uri";
$nwr->add_url($$folder_uri);
is($nwr->errors, undef, 'no errors');
$nwr->{pi_cfg} = bless {}, 'PublicInbox::Config';
-my $mics = $nwr->imap_common_init;
+
+my $set_cred_helper = sub {
+ my ($f, $cred_set) = @_;
+ sysopen(my $fh, $f, O_CREAT|O_EXCL|O_WRONLY) or BAIL_OUT "open $f: $!";
+ print $fh <<EOF or BAIL_OUT "print $f: $!";
+[credential]
+ helper = $cred_set
+EOF
+ close $fh or BAIL_OUT "close $f: $!";
+};
+
+# allow testers with git-credential-store configured to reuse
+# stored credentials inside test_lei(sub {...}) when $ENV{HOME}
+# is overridden and localized.
+my ($cred_set, @cred_link, $tmpdir, $for_destroy);
+chomp(my $cred_helper = `git config credential.helper 2>/dev/null`);
+if ($cred_helper eq 'store') {
+ my $config = $ENV{XDG_CONFIG_HOME} // "$ENV{HOME}/.config";
+ for my $f ("$ENV{HOME}/.git-credentials", "$config/git/credentials") {
+ next unless -f $f;
+ @cred_link = ($f, '/.git-credentials');
+ last;
+ }
+ $cred_set = qq("$cred_helper");
+} elsif ($cred_helper =~ /\Acache(?:[ \t]|\z)/) {
+ my $cache = $ENV{XDG_CACHE_HOME} // "$ENV{HOME}/.cache";
+ for my $d ("$ENV{HOME}/.git-credential-cache",
+ "$cache/git/credential") {
+ next unless -d $d;
+ @cred_link = ($d, '/.git-credential-cache');
+ $cred_set = qq("$cred_helper");
+ last;
+ }
+} elsif (!$cred_helper) { # make the test less painful if no creds configured
+ ($tmpdir, $for_destroy) = tmpdir;
+ my $d = "$tmpdir/.git-credential-cache";
+ mkdir($d, 0700) or BAIL_OUT $!;
+ $cred_set = "cache --timeout=60";
+ @cred_link = ($d, '/.git-credential-cache');
+} else {
+ diag "credential.helper=$cred_helper will not be used for this test";
+}
+
+my $mics = do {
+ local $ENV{HOME} = $tmpdir // $ENV{HOME};
+ if ($tmpdir && $cred_set) {
+ $set_cred_helper->("$ENV{HOME}/.gitconfig", $cred_set)
+ }
+ $nwr->imap_common_init;
+};
my $mic = (values %$mics)[0];
-my $cleanup = PublicInbox::OnDestroy->new(sub {
+my $cleanup = PublicInbox::OnDestroy->new($$, sub {
+ my $mic = $nwr->mic_get($imap_url);
$mic->delete($folder) or fail "delete $folder <$folder_uri>: $@";
+ if ($tmpdir && -f "$tmpdir/.gitconfig") {
+ local $ENV{HOME} = $tmpdir;
+ system(qw(git credential-cache exit));
+ }
});
my $imap_append = $nwr->can('imap_append');
my $smsg = bless { kw => [ 'seen' ] }, 'PublicInbox::Smsg';
$imap_append->($mic, $folder, undef, $smsg, eml_load('t/plack-qp.eml'));
-my @res;
$nwr->{quiet} = 1;
-$nwr->imap_each($$folder_uri, sub {
- my ($u, $uid, $kw, $eml, $arg) = @_;
- push @res, [ $kw, $eml ];
-});
-is(scalar(@res), 1, 'got appended message');
-is_deeply(\@res, [ [ [ 'seen' ], eml_load('t/plack-qp.eml') ] ],
+my $imap_slurp_all = sub {
+ my ($u, $uid, $kw, $eml, $res) = @_;
+ push @$res, [ $kw, $eml ];
+};
+$nwr->imap_each($$folder_uri, $imap_slurp_all, my $res = []);
+is(scalar(@$res), 1, 'got appended message');
+my $plack_qp_eml = eml_load('t/plack-qp.eml');
+is_deeply($res, [ [ [ 'seen' ], $plack_qp_eml ] ],
'uploaded message read back');
+$res = $mic = $mics = undef;
+
+test_lei(sub {
+ my ($ro_home, $cfg_path) = setup_public_inboxes;
+ my $cfg = PublicInbox::Config->new($cfg_path);
+ $cfg->each_inbox(sub {
+ my ($ibx) = @_;
+ lei_ok qw(add-external -q), $ibx->{inboxdir} or BAIL_OUT;
+ });
+
+ # cred_link[0] may be on a different (hopefully encrypted) FS,
+ # we only symlink to it here, so we don't copy any sensitive data
+ # into the temporary directory
+ if (@cred_link && !symlink($cred_link[0], $ENV{HOME}.$cred_link[1])) {
+ diag "symlink @cred_link: $! (non-fatal)";
+ $cred_set = undef;
+ }
+ $set_cred_helper->("$ENV{HOME}/.gitconfig", $cred_set) if $cred_set;
+
+ lei_ok qw(q f:qp@example.com -o), $$folder_uri;
+ $nwr->imap_each($$folder_uri, $imap_slurp_all, my $res = []);
+ is(scalar(@$res), 1, 'got one deduped result') or diag explain($res);
+ is_deeply($res->[0]->[1], $plack_qp_eml,
+ 'lei q wrote expected result');
+
+ lei_ok qw(q f:matz -a -o), $$folder_uri;
+ $nwr->imap_each($$folder_uri, $imap_slurp_all, my $aug = []);
+ is(scalar(@$aug), 2, '2 results after augment') or diag explain($aug);
+ my $exp = $res->[0]->[1]->as_string;
+ is(scalar(grep { $_->[1]->as_string eq $exp } @$aug), 1,
+ 'original remains after augment');
+ $exp = eml_load('t/iso-2202-jp.eml')->as_string;
+ is(scalar(grep { $_->[1]->as_string eq $exp } @$aug), 1,
+ 'new result shown after augment');
+
+ lei_ok qw(q s:thisbetternotgiveanyresult -o), $folder_uri->as_string;
+ $nwr->imap_each($$folder_uri, $imap_slurp_all, my $empty = []);
+ is(scalar(@$empty), 0, 'no results w/o augment');
+
+});
-undef $cleanup;
+undef $cleanup; # remove temporary folder
done_testing;
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 3/7] ipc: add wq_broadcast
2021-02-21 7:41 [PATCH 0/7] "lei q -o imaps://..." support Eric Wong
2021-02-21 7:41 ` [PATCH 1/7] inbox_writable: require PublicInbox::MdirReader Eric Wong
2021-02-21 7:41 ` [PATCH 2/7] lei q: support IMAP/IMAPS --output destinations Eric Wong
@ 2021-02-21 7:41 ` Eric Wong
2021-02-21 7:41 ` [PATCH 4/7] lei q: move augment into lei2mail workers Eric Wong
` (3 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2021-02-21 7:41 UTC (permalink / raw)
To: meta
We'll give workqueues a broadcast mechanism to ensure all
workers see a certain message. We'll also tag each worker
with {-wq_worker_nr} in preparation for work distribution.
This is intended to avoid extra connection and fork() costs
from LeiAuth in a future commit.
---
lib/PublicInbox/IPC.pm | 30 ++++++++++++++++++++++++----
lib/PublicInbox/WQWorker.pm | 8 ++++----
t/ipc.t | 39 ++++++++++++++++++++++---------------
3 files changed, 53 insertions(+), 24 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index efac4c4d..2aeb6462 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -240,8 +240,9 @@ sub recv_and_run {
}
sub wq_worker_loop ($) {
- my ($self) = @_;
+ my ($self, $bcast_a) = @_;
my $wqw = PublicInbox::WQWorker->new($self);
+ PublicInbox::WQWorker->new($self, '-wq_bcast2');
PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
PublicInbox::DS->EventLoop;
PublicInbox::DS->Reset;
@@ -252,6 +253,20 @@ sub do_sock_stream { # via wq_io_do, for big requests
recv_and_run($self, delete $self->{0}, $len, 1);
}
+sub wq_broadcast {
+ my ($self, $sub, @args) = @_;
+ if (my $wkr = $self->{-wq_workers}) {
+ for my $bcast1 (values %$wkr) {
+ my $buf = ipc_freeze([$sub, @args]);
+ send($bcast1, $buf, MSG_EOR) // croak "send: $!";
+ # XXX shouldn't have to deal with EMSGSIZE here...
+ }
+ } else {
+ eval { $self->$sub(@args) };
+ warn "wq_broadcast: $@" if $@;
+ }
+}
+
sub wq_io_do { # always async
my ($self, $sub, $ios, @args) = @_;
if (my $s1 = $self->{-wq_s1}) { # run in worker
@@ -284,15 +299,21 @@ sub wq_io_do { # always async
sub _wq_worker_start ($$$) {
my ($self, $oldset, $fields) = @_;
+ my ($bcast1, $bcast2);
+ socketpair($bcast1, $bcast2, AF_UNIX, $SEQPACKET, 0) or
+ die "socketpair: $!";
my $seed = rand(0xffffffff);
my $pid = fork // die "fork: $!";
if ($pid == 0) {
srand($seed);
+ undef $bcast1;
eval { PublicInbox::DS->Reset };
- delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
+ delete @$self{qw(-wq_s1 -wq_ppid)};
+ $self->{-wq_worker_nr} =
+ keys %{delete($self->{-wq_workers}) // {}};
$SIG{$_} = 'IGNORE' for (qw(PIPE));
$SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD));
- local $0 = $self->{-wq_ident};
+ local $0 = "$self->{-wq_ident} $self->{-wq_worker_nr}";
# ensure we properly exit even if warn() dies:
my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
eval {
@@ -301,12 +322,13 @@ sub _wq_worker_start ($$$) {
my $on_destroy = $self->ipc_atfork_child;
local %SIG = %SIG;
PublicInbox::DS::sig_setmask($oldset);
+ $self->{-wq_bcast2} = $bcast2;
wq_worker_loop($self);
};
warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
undef $end; # trigger exit
} else {
- $self->{-wq_workers}->{$pid} = \undef;
+ $self->{-wq_workers}->{$pid} = $bcast1;
}
}
diff --git a/lib/PublicInbox/WQWorker.pm b/lib/PublicInbox/WQWorker.pm
index 25a5e4fb..3636321e 100644
--- a/lib/PublicInbox/WQWorker.pm
+++ b/lib/PublicInbox/WQWorker.pm
@@ -11,10 +11,10 @@ use Errno qw(EAGAIN ECONNRESET);
use IO::Handle (); # blocking
sub new {
- my (undef, $wq) = @_;
- my $s2 = $wq->{-wq_s2} // die 'BUG: no -wq_s2';
+ my ($cls, $wq, $field) = @_;
+ my $s2 = $wq->{$field // '-wq_s2'} // die "BUG: no {$field}";
$s2->blocking(0);
- my $self = bless { sock => $s2, wq => $wq }, __PACKAGE__;
+ my $self = bless { sock => $s2, wq => $wq }, $cls;
$self->SUPER::new($s2, EPOLLEXCLUSIVE|EPOLLIN|EPOLLET);
$self;
}
@@ -27,7 +27,7 @@ sub event_step {
} while ($n);
return if !defined($n) && $! == EAGAIN; # likely
warn "wq worker error: $!\n" if !defined($n) && $! != ECONNRESET;
- $self->{wq}->wq_atexit_child;
+ $self->{wq}->wq_atexit_child if $self->{sock} == $self->{wq}->{-wq_s2};
$self->close; # PublicInbox::DS::close
}
diff --git a/t/ipc.t b/t/ipc.t
index 345024bd..ca88eb59 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -9,6 +9,7 @@ use Fcntl qw(SEEK_SET);
use Digest::SHA qw(sha1_hex);
require_mods(qw(Storable||Sereal));
require_ok 'PublicInbox::IPC';
+my ($tmpdir, $for_destroy) = tmpdir();
state $once = eval <<'';
package PublicInbox::IPC;
use strict;
@@ -31,6 +32,12 @@ sub test_sha {
print { $self->{1} } sha1_hex($buf), "\n";
$self->{1}->flush;
}
+sub test_append_pid {
+ my ($self, $file) = @_;
+ open my $fh, '>>', $file or die "open: $!";
+ $fh->autoflush(1);
+ print $fh "$$\n" or die "print: $!";
+}
1;
my $ipc = bless {}, 'PublicInbox::IPC';
@@ -83,11 +90,8 @@ $test->('local');
defined($pid) or BAIL_OUT 'no spawn, no test';
is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned');
$test->('worker');
- {
- my ($tmp, $for_destroy) = tmpdir();
- $ipc->ipc_lock_init("$tmp/lock");
- is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned');
- }
+ $ipc->ipc_lock_init("$tmpdir/lock");
+ is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned');
$ipc->ipc_worker_stop;
ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped');
}
@@ -167,18 +171,21 @@ SKIP: {
$SIG{__WARN__} = 'DEFAULT';
is($ipc->wq_workers_start('wq', 1), $$, 'workers started again');
is($ipc->wq_workers, 1, '1 worker started');
- SKIP: {
- $ipc->WQ_MAX_WORKERS > 1 or
- skip 'Inline::C or Socket::MsgHdr not available', 4;
- $ipc->wq_worker_incr;
- is($ipc->wq_workers, 2, 'worker count bumped');
- $ipc->wq_worker_decr;
- $ipc->wq_worker_decr_wait(10);
- is($ipc->wq_workers, 1, 'worker count lowered');
- is($ipc->wq_workers(2), 2, 'worker count set');
- is($ipc->wq_workers, 2, 'worker count stayed set');
- }
+
+ $ipc->wq_worker_incr;
+ is($ipc->wq_workers, 2, 'worker count bumped');
+ $ipc->wq_worker_decr;
+ $ipc->wq_worker_decr_wait(10);
+ is($ipc->wq_workers, 1, 'worker count lowered');
+ is($ipc->wq_workers(2), 2, 'worker count set');
+ is($ipc->wq_workers, 2, 'worker count stayed set');
+
+ $ipc->wq_broadcast('test_append_pid', "$tmpdir/append_pid");
$ipc->wq_close;
+ open my $fh, '<', "$tmpdir/append_pid" or BAIL_OUT "open: $!";
+ chomp(my @pids = <$fh>);
+ my %pids = map { $_ => 1 } grep(/\A[0-9]+\z/, @pids);
+ is(scalar keys %pids, 2, 'broadcast hit both PIDs');
is($ipc->wq_workers, undef, 'workers undef after close');
}
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 4/7] lei q: move augment into lei2mail workers
2021-02-21 7:41 [PATCH 0/7] "lei q -o imaps://..." support Eric Wong
` (2 preceding siblings ...)
2021-02-21 7:41 ` [PATCH 3/7] ipc: add wq_broadcast Eric Wong
@ 2021-02-21 7:41 ` Eric Wong
2021-02-21 7:41 ` [PATCH 5/7] ipc: support setting a locked number of WQ workers Eric Wong
` (2 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2021-02-21 7:41 UTC (permalink / raw)
To: meta
This is a step which will allow us to parallelize augment
on Maildir and IMAP.
---
lib/PublicInbox/LeiToMail.pm | 10 +++++++++-
lib/PublicInbox/LeiXSearch.pm | 18 ++++--------------
t/lei-externals.t | 3 ++-
3 files changed, 15 insertions(+), 16 deletions(-)
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 0e0b0a43..e5398912 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -14,6 +14,7 @@ use PublicInbox::LeiDedupe;
use PublicInbox::OnDestroy;
use PublicInbox::Git;
use PublicInbox::GitAsyncCat;
+use PublicInbox::PktOp qw(pkt_do);
use Symbol qw(gensym);
use IO::Handle; # ->autoflush
use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
@@ -499,7 +500,7 @@ sub pre_augment { # fast (1 disk seek), runs in same process as post_augment
sub do_augment { # slow, runs in wq worker
my ($self, $lei) = @_;
- # _do_augment_maildir, _do_augment_mbox
+ # _do_augment_maildir, _do_augment_mbox, or _do_augment_imap
my $m = "_do_augment_$self->{base_type}";
$self->$m($lei);
}
@@ -516,6 +517,13 @@ sub ipc_atfork_child {
my ($self) = @_;
my $lei = delete $self->{lei};
$lei->lei_atfork_child;
+ if ($self->{-wq_worker_nr} == 0) {
+ local $0 = 'do_augment';
+ eval { do_augment($self, $lei) };
+ $lei->fail($@) if $@;
+ pkt_do($lei->{pkt_op_p}, '.') == 1 or
+ die "do_post_augment trigger: $!";
+ }
if (my $zpipe = delete $lei->{zpipe}) {
$lei->{1} = $zpipe->[1];
close $zpipe->[0];
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 10485220..a319b75f 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -99,21 +99,21 @@ sub _mset_more ($$) {
$size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{limit});
}
-# $startq will EOF when query_prepare is done augmenting and allow
+# $startq will EOF when do_augment is done augmenting and allow
# query_mset and query_thread_mset to proceed.
sub wait_startq ($) {
my ($lei) = @_;
my $startq = delete $lei->{startq} or return;
while (1) {
- my $n = sysread($startq, my $query_prepare_done, 1);
+ my $n = sysread($startq, my $do_augment_done, 1);
if (defined $n) {
return if $n == 0; # no MUA
- if ($query_prepare_done eq 'q') {
+ if ($do_augment_done eq 'q') {
$lei->{opt}->{quiet} = 1;
delete $lei->{opt}->{verbose};
delete $lei->{-progress};
} else {
- $lei->fail("$$ WTF `$query_prepare_done'");
+ $lei->fail("$$ WTF `$do_augment_done'");
}
return;
}
@@ -386,15 +386,6 @@ sub ipc_atfork_child {
$self->SUPER::ipc_atfork_child;
}
-sub query_prepare { # called by wq_io_do
- my ($self) = @_;
- local $0 = "$0 query_prepare";
- my $lei = $self->{lei};
- eval { $lei->{l2m}->do_augment($lei) };
- $lei->fail($@) if $@;
- pkt_do($lei->{pkt_op_p}, '.') == 1 or die "do_post_augment trigger: $!"
-}
-
sub do_query {
my ($self, $lei) = @_;
my $ops = {
@@ -433,7 +424,6 @@ sub do_query {
delete $lei->{pkt_op_p};
$l2m->wq_close(1) if $l2m;
$lei->event_step_init; # wait for shutdowns
- $self->wq_io_do('query_prepare', []) if $l2m; # for augment/dedupe
start_query($self, $lei);
$self->wq_close(1); # lei_xsearch workers stop when done
if ($lei->{oneshot}) {
diff --git a/t/lei-externals.t b/t/lei-externals.t
index edfbb2bf..02b15232 100644
--- a/t/lei-externals.t
+++ b/t/lei-externals.t
@@ -186,7 +186,8 @@ SKIP: {
my @s = grep(/^Subject:/, $cat->());
is(scalar(@s), 1, "1 result in mbox$sfx");
$lei->('q', '-a', '-o', "mboxcl2:$f", 's:see attachment');
- is(grep(!/^#/, $lei_err), 0, 'no errors from augment');
+ is(grep(!/^#/, $lei_err), 0, 'no errors from augment') or
+ diag $lei_err;
@s = grep(/^Subject:/, my @wtf = $cat->());
is(scalar(@s), 2, "2 results in mbox$sfx");
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 5/7] ipc: support setting a locked number of WQ workers
2021-02-21 7:41 [PATCH 0/7] "lei q -o imaps://..." support Eric Wong
` (3 preceding siblings ...)
2021-02-21 7:41 ` [PATCH 4/7] lei q: move augment into lei2mail workers Eric Wong
@ 2021-02-21 7:41 ` Eric Wong
2021-02-21 7:41 ` [PATCH 6/7] net_reader: use and accept URIimap objects in more places Eric Wong
2021-02-21 7:41 ` [PATCH 7/7] lei2mail: parallel augment for lock-free stores Eric Wong
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2021-02-21 7:41 UTC (permalink / raw)
To: meta
We can use this to ensure sharded work doesn't do unexpected
things if workers are added/removed. We currently don't
increase/decrease workers once a workqueue is started, but
non-lei code (-httpd/imapd) may start doing so.
This also fixes a bug where lei2mail workers could not
be adjusted via --jobs on the command-line.
---
lib/PublicInbox/IPC.pm | 5 ++++-
lib/PublicInbox/LeiQuery.pm | 6 +++---
lib/PublicInbox/LeiXSearch.pm | 4 ++--
3 files changed, 9 insertions(+), 6 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 2aeb6462..1fa67d00 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -341,7 +341,7 @@ sub wq_workers_start {
socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, $SEQPACKET, 0) or
die "socketpair: $!";
$self->ipc_atfork_prepare;
- $nr_workers //= 4;
+ $nr_workers //= $self->{-wq_nr_workers};
$nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS;
my $sigset = $oldset // PublicInbox::DS::block_signals();
$self->{-wq_workers} = {};
@@ -354,6 +354,7 @@ sub wq_workers_start {
sub wq_worker_incr { # SIGTTIN handler
my ($self, $oldset, $fields) = @_;
$self->{-wq_s2} or return;
+ die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
return if wq_workers($self) >= $WQ_MAX_WORKERS;
$self->ipc_atfork_prepare;
my $sigset = $oldset // PublicInbox::DS::block_signals();
@@ -369,6 +370,7 @@ sub wq_exit { # wakes up wq_worker_decr_wait
sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
my ($self) = @_;
return unless wq_workers($self);
+ die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
$self->wq_io_do('wq_exit');
# caller must call wq_worker_decr_wait in main loop
}
@@ -376,6 +378,7 @@ sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
sub wq_worker_decr_wait {
my ($self, $timeout, $cb, @args) = @_;
return if $self->{-wq_ppid} != $$; # can't reap siblings or parents
+ die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
my $s1 = $self->{-wq_s1} // croak 'BUG: no wq_s1';
vec(my $rin = '', fileno($s1), 1) = 1;
select(my $rout = $rin, undef, undef, $timeout) or
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index eaf91f2e..398f834f 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -90,14 +90,14 @@ sub lei_q {
return $self->fail("`$xj' search jobs must be >= 1");
}
$xj ||= $lxs->concurrency($opt); # allow: "--jobs ,$WRITER_ONLY"
- my $nproc = $lxs->detect_nproc; # don't memoize, schedtool(1) exists
+ my $nproc = $lxs->detect_nproc // 1; # don't memoize, schedtool(1) exists
$xj = $nproc if $xj > $nproc;
- $lxs->{jobs} = $xj;
+ $lxs->{-wq_nr_workers} = $xj;
if (defined($mj) && $mj !~ /\A[1-9][0-9]*\z/) {
return $self->fail("`$mj' writer jobs must be >= 1");
}
- $self->{l2m}->{jobs} = ($mj // $nproc) if $self->{l2m};
PublicInbox::LeiOverview->new($self) or return;
+ $self->{l2m}->{-wq_nr_workers} = ($mj // $nproc) if $self->{l2m};
my %mset_opt = map { $_ => $opt->{$_} } qw(threads limit offset);
$mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index a319b75f..524f4d1c 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -407,7 +407,7 @@ sub do_query {
if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
$lei->start_mua;
}
- $l2m->wq_workers_start('lei2mail', $l2m->{jobs},
+ $l2m->wq_workers_start('lei2mail', undef,
$lei->oldset, { lei => $lei });
pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
# 1031: F_SETPIPE_SZ
@@ -418,7 +418,7 @@ sub do_query {
# delete until all lei2mail + lei_xsearch workers are reaped
$lei->{git_tmp} = $self->{git_tmp} = git_tmp($self);
}
- $self->wq_workers_start('lei_xsearch', $self->{jobs},
+ $self->wq_workers_start('lei_xsearch', undef,
$lei->oldset, { lei => $lei });
my $op = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 6/7] net_reader: use and accept URIimap objects in more places
2021-02-21 7:41 [PATCH 0/7] "lei q -o imaps://..." support Eric Wong
` (4 preceding siblings ...)
2021-02-21 7:41 ` [PATCH 5/7] ipc: support setting a locked number of WQ workers Eric Wong
@ 2021-02-21 7:41 ` Eric Wong
2021-02-21 7:41 ` [PATCH 7/7] lei2mail: parallel augment for lock-free stores Eric Wong
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2021-02-21 7:41 UTC (permalink / raw)
To: meta
This flexibility should save us some code down-the-line.
---
lib/PublicInbox/LeiToMail.pm | 7 ++--
lib/PublicInbox/NetReader.pm | 63 +++++++++++++++++-------------------
lib/PublicInbox/Watch.pm | 11 ++++---
xt/net_writer-imap.t | 10 +++---
4 files changed, 44 insertions(+), 47 deletions(-)
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index e5398912..b5d560c7 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -337,7 +337,7 @@ sub _imap_write_cb ($$) {
my $dedupe = $lei->{dedupe};
$dedupe->prepare_dedupe if $dedupe;
my $imap_append = $lei->{nwr}->can('imap_append');
- my $mic = $lei->{nwr}->mic_get($lei->{ovv}->{dst});
+ my $mic = $lei->{nwr}->mic_get($self->{uri});
my $folder = $self->{uri}->mailbox;
sub { # for git_to_mail
my ($bref, $smsg, $eml) = @_;
@@ -436,16 +436,15 @@ sub _augment_imap { # PublicInbox::NetReader::imap_each cb
sub _do_augment_imap {
my ($self, $lei) = @_;
- my $dst = $lei->{ovv}->{dst};
my $nwr = $lei->{nwr};
if ($lei->{opt}->{augment}) {
my $dedupe = $lei->{dedupe};
if ($dedupe && $dedupe->prepare_dedupe) {
- $nwr->imap_each($dst, \&_augment_imap, $lei);
+ $nwr->imap_each($self->{uri}, \&_augment_imap, $lei);
$dedupe->pause_dedupe;
}
} else { # clobber existing IMAP folder
- $nwr->imap_delete_all($dst);
+ $nwr->imap_delete_all($self->{uri});
}
}
diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm
index 541094a0..4c412491 100644
--- a/lib/PublicInbox/NetReader.pm
+++ b/lib/PublicInbox/NetReader.pm
@@ -12,8 +12,8 @@ our %IMAPflags2kw = map {; "\\\u$_" => $_ } qw(seen answered flagged draft);
# TODO: trim this down, this is huge
our @EXPORT = qw(uri_new uri_scheme uri_section
- mic_for nn_new nn_for
- imap_url nntp_url
+ nn_new nn_for
+ imap_uri nntp_url
cfg_bool cfg_intvl imap_common_init
);
@@ -213,11 +213,11 @@ W: see https://rt.cpan.org/Ticket/Display.html?id=129967 for updates
$nn;
}
-sub imap_url {
+sub imap_uri {
my ($url) = @_;
require PublicInbox::URIimap;
my $uri = PublicInbox::URIimap->new($url);
- $uri ? $uri->canonical->as_string : undef;
+ $uri ? $uri->canonical : undef;
}
my %IS_NNTP = (news => 1, snews => 1, nntp => 1);
@@ -262,21 +262,20 @@ sub imap_common_init ($;$) {
require PublicInbox::URIimap;
my $cfg = $self->{pi_cfg} // $lei->_lei_cfg;
my $mic_args = {}; # scheme://authority => Mail:IMAPClient arg
- for my $url (@{$self->{imap_order}}) {
- my $uri = PublicInbox::URIimap->new($url);
+ for my $uri (@{$self->{imap_order}}) {
my $sec = uri_section($uri);
for my $k (qw(Starttls Debug Compress)) {
- my $bool = cfg_bool($cfg, "imap.$k", $url) // next;
+ my $bool = cfg_bool($cfg, "imap.$k", $$uri) // next;
$mic_args->{$sec}->{$k} = $bool;
}
- my $to = cfg_intvl($cfg, 'imap.timeout', $url);
+ my $to = cfg_intvl($cfg, 'imap.timeout', $$uri);
$mic_args->{$sec}->{Timeout} = $to if $to;
for my $k (qw(pollInterval idleInterval)) {
- $to = cfg_intvl($cfg, "imap.$k", $url) // next;
+ $to = cfg_intvl($cfg, "imap.$k", $$uri) // next;
$self->{imap_opt}->{$sec}->{$k} = $to;
}
my $k = 'imap.fetchBatchSize';
- my $bs = $cfg->urlmatch($k, $url) // next;
+ my $bs = $cfg->urlmatch($k, $$uri) // next;
if ($bs =~ /\A([0-9]+)\z/) {
$self->{imap_opt}->{$sec}->{batch_size} = $bs;
} else {
@@ -286,23 +285,22 @@ sub imap_common_init ($;$) {
# make sure we can connect and cache the credentials in memory
$self->{mic_arg} = {}; # schema://authority => IMAPClient->new args
my $mics = {}; # schema://authority => IMAPClient obj
- for my $url (@{$self->{imap_order}}) {
- my $uri = PublicInbox::URIimap->new($url);
+ for my $uri (@{$self->{imap_order}}) {
my $sec = uri_section($uri);
$mics->{$sec} //= mic_for($self, "$sec/", $mic_args, $lei);
next unless $self->isa('PublicInbox::NetWriter');
my $dst = $uri->mailbox // next;
my $mic = $mics->{$sec};
next if $mic->exists($dst); # already exists
- $mic->create($dst) or die "CREATE $dst failed <$url>: $@";
+ $mic->create($dst) or die "CREATE $dst failed <$uri>: $@";
}
$mics;
}
sub add_url {
my ($self, $arg) = @_;
- if (my $url = imap_url($arg)) {
- push @{$self->{imap_order}}, $url;
+ if (my $uri = imap_uri($arg)) {
+ push @{$self->{imap_order}}, $uri;
} else {
push @{$self->{unsupported_url}}, $arg;
}
@@ -321,7 +319,7 @@ sub errors {
}
sub _imap_do_msg ($$$$$) {
- my ($self, $url, $uid, $raw, $flags) = @_;
+ my ($self, $uri, $uid, $raw, $flags) = @_;
# our target audience expects LF-only, save storage
$$raw =~ s/\r\n/\n/sg;
my $kw = [];
@@ -330,12 +328,11 @@ sub _imap_do_msg ($$$$$) {
push @$kw, $k;
}
my ($eml_cb, @args) = @{$self->{eml_each}};
- $eml_cb->($url, $uid, $kw, PublicInbox::Eml->new($raw), @args);
+ $eml_cb->($uri, $uid, $kw, PublicInbox::Eml->new($raw), @args);
}
sub _imap_fetch_all ($$$) {
- my ($self, $mic, $url) = @_;
- my $uri = PublicInbox::URIimap->new($url);
+ my ($self, $mic, $uri) = @_;
my $sec = uri_section($uri);
my $mbx = $uri->mailbox;
$mic->Clear(1); # trim results history
@@ -347,27 +344,27 @@ sub _imap_fetch_all ($$$) {
last if $r_uidval && $r_uidnext;
}
$r_uidval //= $mic->uidvalidity($mbx) //
- return "E: $url cannot get UIDVALIDITY";
+ return "E: $uri cannot get UIDVALIDITY";
$r_uidnext //= $mic->uidnext($mbx) //
- return "E: $url cannot get UIDNEXT";
+ return "E: $uri cannot get UIDNEXT";
my $itrk = $self->{incremental} ?
- PublicInbox::IMAPTracker->new($url) : 0;
+ PublicInbox::IMAPTracker->new($$uri) : 0;
my ($l_uidval, $l_uid) = $itrk ? $itrk->get_last : ();
$l_uidval //= $r_uidval; # first time
$l_uid //= 0;
if ($l_uidval != $r_uidval) {
- return "E: $url UIDVALIDITY mismatch\n".
+ return "E: $uri UIDVALIDITY mismatch\n".
"E: local=$l_uidval != remote=$r_uidval";
}
my $r_uid = $r_uidnext - 1;
if ($l_uid > $r_uid) {
- return "E: $url local UID exceeds remote ($l_uid > $r_uid)\n".
- "E: $url strangely, UIDVALIDLITY matches ($l_uidval)\n";
+ return "E: $uri local UID exceeds remote ($l_uid > $r_uid)\n".
+ "E: $uri strangely, UIDVALIDLITY matches ($l_uidval)\n";
}
return if $l_uid >= $r_uid; # nothing to do
$l_uid ||= 1;
- warn "# $url fetching UID $l_uid:$r_uid\n" unless $self->{quiet};
+ warn "# $uri fetching UID $l_uid:$r_uid\n" unless $self->{quiet};
$mic->Uid(1); # the default, we hope
my $bs = $self->{imap_opt}->{$sec}->{batch_size} // 1;
my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK';
@@ -380,7 +377,7 @@ sub _imap_fetch_all ($$$) {
# 1) servers do not need to return results in any order
# 2) Mail::IMAPClient doesn't offer a streaming API
$uids = $mic->search("UID $l_uid:*") or
- return "E: $url UID SEARCH $l_uid:* error: $!";
+ return "E: $uri UID SEARCH $l_uid:* error: $!";
return if scalar(@$uids) == 0;
# RFC 3501 doesn't seem to indicate order of UID SEARCH
@@ -400,14 +397,14 @@ sub _imap_fetch_all ($$$) {
local $0 = "UID:$batch $mbx $sec";
my $r = $mic->fetch_hash($batch, $req, 'FLAGS');
unless ($r) { # network error?
- $err = "E: $url UID FETCH $batch error: $!";
+ $err = "E: $uri UID FETCH $batch error: $!";
last;
}
for my $uid (@batch) {
# messages get deleted, so holes appear
my $per_uid = delete $r->{$uid} // next;
my $raw = delete($per_uid->{$key}) // next;
- _imap_do_msg($self, $url, $uid, \$raw,
+ _imap_do_msg($self, $uri, $uid, \$raw,
$per_uid->{FLAGS});
$last_uid = $uid;
last if $self->{quit};
@@ -424,7 +421,7 @@ sub mic_get {
my ($self, $sec) = @_;
my $mic_arg = $self->{mic_arg}->{$sec};
unless ($mic_arg) {
- my $uri = PublicInbox::URIimap->new($sec);
+ my $uri = ref $sec ? $sec : PublicInbox::URIimap->new($sec);
$sec = uri_section($uri);
$mic_arg = $self->{mic_arg}->{$sec} or
die "BUG: no Mail::IMAPClient->new arg for $sec";
@@ -440,14 +437,14 @@ sub mic_get {
sub imap_each {
my ($self, $url, $eml_cb, @args) = @_;
- my $uri = PublicInbox::URIimap->new($url);
+ my $uri = ref($url) ? $url : PublicInbox::URIimap->new($url);
my $sec = uri_section($uri);
local $0 = $uri->mailbox." $sec";
- my $mic = mic_get($self, $sec);
+ my $mic = mic_get($self, $uri);
my $err;
if ($mic) {
local $self->{eml_each} = [ $eml_cb, @args ];
- $err = _imap_fetch_all($self, $mic, $url);
+ $err = _imap_fetch_all($self, $mic, $uri);
} else {
$err = "E: not connected: $!";
}
diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm
index c64689a1..8d13ea35 100644
--- a/lib/PublicInbox/Watch.pm
+++ b/lib/PublicInbox/Watch.pm
@@ -60,9 +60,9 @@ sub new {
if (is_maildir($dir)) {
# skip "new", no MUA has seen it, yet.
$mdmap{"$dir/cur"} = 'watchspam';
- } elsif ($url = imap_url($dir)) {
- $imap{$url} = 'watchspam';
- push @imap, $url;
+ } elsif (my $uri = imap_uri($dir)) {
+ $imap{$$uri} = 'watchspam';
+ push @imap, $uri;
} elsif ($url = nntp_url($dir)) {
$nntp{$url} = 'watchspam';
push @nntp, $url;
@@ -92,11 +92,12 @@ sub new {
return if is_watchspam($cur, $cur_dst, $ibx);
push @{$mdmap{$new} //= []}, $ibx;
push @$cur_dst, $ibx;
- } elsif ($url = imap_url($watch)) {
+ } elsif (my $uri = imap_uri($watch)) {
+ my $url = $$uri;
return if is_watchspam($url, $imap{$url}, $ibx);
compile_watchheaders($ibx);
my $n = push @{$imap{$url} ||= []}, $ibx;
- push @imap, $url if $n == 1;
+ push @imap, $uri if $n == 1;
} elsif ($url = nntp_url($watch)) {
return if is_watchspam($url, $nntp{$url}, $ibx);
compile_watchheaders($ibx);
diff --git a/xt/net_writer-imap.t b/xt/net_writer-imap.t
index 4832245a..64f822cf 100644
--- a/xt/net_writer-imap.t
+++ b/xt/net_writer-imap.t
@@ -79,7 +79,7 @@ my $mics = do {
};
my $mic = (values %$mics)[0];
my $cleanup = PublicInbox::OnDestroy->new($$, sub {
- my $mic = $nwr->mic_get($imap_url);
+ my $mic = $nwr->mic_get($uri);
$mic->delete($folder) or fail "delete $folder <$folder_uri>: $@";
if ($tmpdir && -f "$tmpdir/.gitconfig") {
local $ENV{HOME} = $tmpdir;
@@ -94,7 +94,7 @@ my $imap_slurp_all = sub {
my ($u, $uid, $kw, $eml, $res) = @_;
push @$res, [ $kw, $eml ];
};
-$nwr->imap_each($$folder_uri, $imap_slurp_all, my $res = []);
+$nwr->imap_each($folder_uri, $imap_slurp_all, my $res = []);
is(scalar(@$res), 1, 'got appended message');
my $plack_qp_eml = eml_load('t/plack-qp.eml');
is_deeply($res, [ [ [ 'seen' ], $plack_qp_eml ] ],
@@ -119,13 +119,13 @@ test_lei(sub {
$set_cred_helper->("$ENV{HOME}/.gitconfig", $cred_set) if $cred_set;
lei_ok qw(q f:qp@example.com -o), $$folder_uri;
- $nwr->imap_each($$folder_uri, $imap_slurp_all, my $res = []);
+ $nwr->imap_each($folder_uri, $imap_slurp_all, my $res = []);
is(scalar(@$res), 1, 'got one deduped result') or diag explain($res);
is_deeply($res->[0]->[1], $plack_qp_eml,
'lei q wrote expected result');
lei_ok qw(q f:matz -a -o), $$folder_uri;
- $nwr->imap_each($$folder_uri, $imap_slurp_all, my $aug = []);
+ $nwr->imap_each($folder_uri, $imap_slurp_all, my $aug = []);
is(scalar(@$aug), 2, '2 results after augment') or diag explain($aug);
my $exp = $res->[0]->[1]->as_string;
is(scalar(grep { $_->[1]->as_string eq $exp } @$aug), 1,
@@ -135,7 +135,7 @@ test_lei(sub {
'new result shown after augment');
lei_ok qw(q s:thisbetternotgiveanyresult -o), $folder_uri->as_string;
- $nwr->imap_each($$folder_uri, $imap_slurp_all, my $empty = []);
+ $nwr->imap_each($folder_uri, $imap_slurp_all, my $empty = []);
is(scalar(@$empty), 0, 'no results w/o augment');
});
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 7/7] lei2mail: parallel augment for lock-free stores
2021-02-21 7:41 [PATCH 0/7] "lei q -o imaps://..." support Eric Wong
` (5 preceding siblings ...)
2021-02-21 7:41 ` [PATCH 6/7] net_reader: use and accept URIimap objects in more places Eric Wong
@ 2021-02-21 7:41 ` Eric Wong
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2021-02-21 7:41 UTC (permalink / raw)
To: meta
This lets us make use of multiple cores on IMAP and Maildir
backed by SSD (or better) storage. This benefits IMAP stores
with high network latency, but may still penalize IMAP servers
with rotational storage.
---
lib/PublicInbox/LeiToMail.pm | 32 ++++++++++++++++++++++++++++----
lib/PublicInbox/LeiXSearch.pm | 26 ++++++++++++++++----------
lib/PublicInbox/NetReader.pm | 9 +++++++--
3 files changed, 51 insertions(+), 16 deletions(-)
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index b5d560c7..6efd398a 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -19,6 +19,7 @@ use Symbol qw(gensym);
use IO::Handle; # ->autoflush
use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
use Errno qw(EEXIST ESPIPE ENOENT EPIPE);
+use Digest::SHA qw(sha256_hex);
my ($maildir_each_file);
# struggles with short-lived repos, Gcf2Client makes little sense with lei;
@@ -269,7 +270,15 @@ sub _mbox_write_cb ($$) {
}
sub _augment_file { # maildir_each_file cb
- my ($f, $lei) = @_;
+ my ($f, $lei, $mod, $shard) = @_;
+ if ($mod) {
+ # can't get dirent.d_ino w/ pure Perl, so we extract the OID
+ # if it looks like one:
+ my $hex = $f =~ m!\b([a-f0-9]{40,})[^/]*\z! ?
+ $1 : sha256_hex($f);
+ my $recno = hex(substr($hex, 0, 8));
+ return if ($recno % $mod) != $shard;
+ }
my $eml = PublicInbox::InboxWritable::eml_from_path($f) or return;
_augment($eml, $lei);
}
@@ -421,7 +430,9 @@ sub _do_augment_maildir {
if ($lei->{opt}->{augment}) {
my $dedupe = $lei->{dedupe};
if ($dedupe && $dedupe->prepare_dedupe) {
- $maildir_each_file->($dst, \&_augment_file, $lei);
+ my ($mod, $shard) = @{$self->{shard_info} // []};
+ $maildir_each_file->($dst, \&_augment_file,
+ $lei, $mod, $shard);
$dedupe->pause_dedupe;
}
} else { # clobber existing Maildir
@@ -516,11 +527,24 @@ sub ipc_atfork_child {
my ($self) = @_;
my $lei = delete $self->{lei};
$lei->lei_atfork_child;
- if ($self->{-wq_worker_nr} == 0) {
+ my $aug;
+ if (lock_free($self)) {
+ my $mod = $self->{-wq_nr_workers};
+ my $shard = $self->{-wq_worker_nr};
+ if (my $nwr = $lei->{nwr}) {
+ $nwr->{shard_info} = [ $mod, $shard ];
+ } else { # Maildir (MH?)
+ $self->{shard_info} = [ $mod, $shard ];
+ }
+ $aug = '+'; # incr_post_augment
+ } elsif ($self->{-wq_worker_nr} == 0) {
+ $aug = '.'; # do_post_augment
+ }
+ if ($aug) {
local $0 = 'do_augment';
eval { do_augment($self, $lei) };
$lei->fail($@) if $@;
- pkt_do($lei->{pkt_op_p}, '.') == 1 or
+ pkt_do($lei->{pkt_op_p}, $aug) == 1 or
die "do_post_augment trigger: $!";
}
if (my $zpipe = delete $lei->{zpipe}) {
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 524f4d1c..e982165f 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -331,18 +331,16 @@ Error closing $lei->{ovv}->{dst}: $!
sub do_post_augment {
my ($lei) = @_;
- my $l2m = $lei->{l2m};
+ my $l2m = $lei->{l2m} or die 'BUG: unexpected do_post_augment';
my $err;
- if ($l2m) {
- eval { $l2m->post_augment($lei) };
- $err = $@;
- if ($err) {
- if (my $lxs = delete $lei->{lxs}) {
- $lxs->wq_kill;
- $lxs->wq_close(0, undef, $lei);
- }
- $lei->fail("$err");
+ eval { $l2m->post_augment($lei) };
+ $err = $@;
+ if ($err) {
+ if (my $lxs = delete $lei->{lxs}) {
+ $lxs->wq_kill;
+ $lxs->wq_close(0, undef, $lei);
}
+ $lei->fail("$err");
}
if (!$err && delete $lei->{early_mua}) { # non-augment case
$lei->start_mua;
@@ -350,6 +348,13 @@ sub do_post_augment {
close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch
}
+sub incr_post_augment { # called whenever an l2m shard finishes
+ my ($lei) = @_;
+ my $l2m = $lei->{l2m} or die 'BUG: unexpected incr_post_augment';
+ return if ++$lei->{nr_post_augment} != $l2m->{-wq_nr_workers};
+ do_post_augment($lei);
+}
+
my $MAX_PER_HOST = 4;
sub concurrency {
@@ -392,6 +397,7 @@ sub do_query {
'|' => [ $lei->can('sigpipe_handler'), $lei ],
'!' => [ $lei->can('fail_handler'), $lei ],
'.' => [ \&do_post_augment, $lei ],
+ '+' => [ \&incr_post_augment, $lei ],
'' => [ \&query_done, $lei ],
'mset_progress' => [ \&mset_progress, $lei ],
'x_it' => [ $lei->can('x_it'), $lei ],
diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm
index 4c412491..0956d5da 100644
--- a/lib/PublicInbox/NetReader.pm
+++ b/lib/PublicInbox/NetReader.pm
@@ -363,8 +363,11 @@ sub _imap_fetch_all ($$$) {
}
return if $l_uid >= $r_uid; # nothing to do
$l_uid ||= 1;
-
- warn "# $uri fetching UID $l_uid:$r_uid\n" unless $self->{quiet};
+ my ($mod, $shard) = @{$self->{shard_info} // []};
+ unless ($self->{quiet}) {
+ my $m = $mod ? " [(UID % $mod) == $shard]" : '';
+ warn "# $uri fetching UID $l_uid:$r_uid$m\n";
+ }
$mic->Uid(1); # the default, we hope
my $bs = $self->{imap_opt}->{$sec}->{batch_size} // 1;
my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK';
@@ -391,6 +394,8 @@ sub _imap_fetch_all ($$$) {
$l_uid = $uids->[-1] + 1; # for next search
my $last_uid;
my $n = $self->{max_batch};
+
+ @$uids = grep { ($_ % $mod) == $shard } @$uids if $mod;
while (scalar @$uids) {
my @batch = splice(@$uids, 0, $bs);
$batch = join(',', @batch);
^ permalink raw reply related [flat|nested] 8+ messages in thread
end of thread, other threads:[~2021-02-21 7:41 UTC | newest]
Thread overview: 8+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2021-02-21 7:41 [PATCH 0/7] "lei q -o imaps://..." support Eric Wong
2021-02-21 7:41 ` [PATCH 1/7] inbox_writable: require PublicInbox::MdirReader Eric Wong
2021-02-21 7:41 ` [PATCH 2/7] lei q: support IMAP/IMAPS --output destinations Eric Wong
2021-02-21 7:41 ` [PATCH 3/7] ipc: add wq_broadcast Eric Wong
2021-02-21 7:41 ` [PATCH 4/7] lei q: move augment into lei2mail workers Eric Wong
2021-02-21 7:41 ` [PATCH 5/7] ipc: support setting a locked number of WQ workers Eric Wong
2021-02-21 7:41 ` [PATCH 6/7] net_reader: use and accept URIimap objects in more places Eric Wong
2021-02-21 7:41 ` [PATCH 7/7] lei2mail: parallel augment for lock-free stores Eric Wong
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).