I'm not sure if "prune-mail-sync" needs to be exposed, but I suppose it could be useful in some cases. It's certainly easier to implement. "lei tag" gets a nice speedup for Maildirs, IMAP speedup is probably better off done after we get IMAP pipelining far into the future. Not sure if anybody is using InboxWritable->import_maildir... Eric Wong (5): inbox_writable: fix import_maildir mdir_reader: maildir_each_file: pass flags, skip Trash lei tag: parallelize Maildir access lei_mail_sync: hoist out --all handling from export-kw lei prune-mail-sync: new command to prune invalid sync data MANIFEST | 1 + lib/PublicInbox/InboxWritable.pm | 15 ++--- lib/PublicInbox/LEI.pm | 2 + lib/PublicInbox/LeiExportKw.pm | 32 +--------- lib/PublicInbox/LeiImport.pm | 12 ++-- lib/PublicInbox/LeiMailSync.pm | 36 +++++++++++ lib/PublicInbox/LeiPmdir.pm | 18 ++---- lib/PublicInbox/LeiPruneMailSync.pm | 97 +++++++++++++++++++++++++++++ lib/PublicInbox/LeiTag.pm | 8 ++- lib/PublicInbox/MdirReader.pm | 5 +- lib/PublicInbox/NetReader.pm | 19 ++++++ lib/PublicInbox/NetWriter.pm | 21 +------ 12 files changed, 184 insertions(+), 82 deletions(-) create mode 100644 lib/PublicInbox/LeiPruneMailSync.pm
I'm not sure if anybody uses this, but it exists. It'll likely be dropped in the future. Fixes: fa3f0cbcd1af5008 ("use MdirReader in -watch and InboxWritable") --- lib/PublicInbox/InboxWritable.pm | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/lib/PublicInbox/InboxWritable.pm b/lib/PublicInbox/InboxWritable.pm index 45d8cdc7..65539781 100644 --- a/lib/PublicInbox/InboxWritable.pm +++ b/lib/PublicInbox/InboxWritable.pm @@ -131,13 +131,9 @@ sub eml_from_path ($) { } } -sub _each_maildir_fn { - my ($fn, $im, $self) = @_; - if ($fn =~ /:2,([A-Za-z]*)\z/) { - my $fl = $1; - return if $fl =~ /[DT]/; # no Drafts or Trash for public - } - my $eml = eml_from_path($fn) or return; +sub _each_maildir_eml { + my ($fn, $kw, $eml, $im, $self) = @_; + return if grep(/\Adraft\z/, @$kw); if ($self && (my $filter = $self->filter($im))) { my $ret = $filter->scrub($eml) or return; return if $ret == REJECT(); @@ -146,6 +142,7 @@ sub _each_maildir_fn { $im->add($eml); } +# XXX does anybody use this? sub import_maildir { my ($self, $dir) = @_; foreach my $sub (qw(cur new tmp)) { @@ -154,8 +151,8 @@ sub import_maildir { my $im = $self->importer(1); my @self = $self->filter($im) ? ($self) : (); require PublicInbox::MdirReader; - PublicInbox::MdirReader->new->maildir_each_file(\&_each_maildir_fn, - $im, @self); + PublicInbox::MdirReader->new->maildir_each_eml($dir, + \&_each_maildir_eml, $im, @self); $im->done; }
This is a slight behavior change for "lei q": Trashed (but not-yet-expunged) messages no longer get unlinked when --output is used without --augment. --- lib/PublicInbox/LeiImport.pm | 4 +--- lib/PublicInbox/LeiPmdir.pm | 8 ++++---- lib/PublicInbox/MdirReader.pm | 5 +++-- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index b0e7ba6b..cddd5619 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -30,11 +30,9 @@ sub input_mbox_cb { # MboxReader callback } sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn - my ($self, $f, @args) = @_; + my ($self, $f, $fl) = @_; my ($folder, $bn) = ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) or die "BUG: $f was not from a Maildir?\n"; - my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn); - return if index($fl, 'T') >= 0; # no Trashed messages my $kw = PublicInbox::MdirReader::flags2kw($fl); substr($folder, 0, 0) = 'maildir:'; # add prefix my $lms = $self->{-lms_ro}; diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm index b71efe70..aa9ce713 100644 --- a/lib/PublicInbox/LeiPmdir.pm +++ b/lib/PublicInbox/LeiPmdir.pm @@ -42,13 +42,13 @@ sub ipc_atfork_child { } sub each_mdir_fn { # maildir_each_file callback - my ($f, $self, @args) = @_; - $self->wq_io_do('mdir_iter', [], $f, @args); + my ($f, $fl, $self, @args) = @_; + $self->wq_io_do('mdir_iter', [], $f, $fl, @args); } sub mdir_iter { # via wq_io_do - my ($self, $f, @args) = @_; - $self->{ipt}->pmdir_cb($f, @args); + my ($self, $f, $fl, @args) = @_; + $self->{ipt}->pmdir_cb($f, $fl, @args); } sub pmd_done_wait { diff --git a/lib/PublicInbox/MdirReader.pm b/lib/PublicInbox/MdirReader.pm index 484bf0a8..dbb74d6d 100644 --- a/lib/PublicInbox/MdirReader.pm +++ b/lib/PublicInbox/MdirReader.pm @@ -42,9 +42,10 @@ sub maildir_each_file { my $pfx = $dir.$d; opendir my $dh, $pfx or next; while (defined(my $bn = readdir($dh))) { - maildir_basename_flags($bn) // next; + my $fl = maildir_basename_flags($bn) // next; next if defined($mod) && !shard_ok($bn, $mod, $shard); - $cb->($pfx.$bn, @arg); + next if index($fl, 'T') >= 0; # no Trashed messages + $cb->($pfx.$bn, $fl, @arg); } } }
Since Maildir isn't guaranteed to have any sort of order, we can parallelize inputs, here. On a 4-core system, this reduced one of my tag invocations from 5.5 to 1.4s. --- lib/PublicInbox/LeiImport.pm | 8 ++++---- lib/PublicInbox/LeiPmdir.pm | 10 ++-------- lib/PublicInbox/LeiTag.pm | 8 +++++--- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index cddd5619..e3cb69ca 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -35,13 +35,13 @@ sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn die "BUG: $f was not from a Maildir?\n"; my $kw = PublicInbox::MdirReader::flags2kw($fl); substr($folder, 0, 0) = 'maildir:'; # add prefix - my $lms = $self->{-lms_ro}; + my $lse = $self->{lse} //= $self->{lei}->{sto}->search; + my $lms = $self->{-lms_ro} //= $lse->lms; # may be 0 or undef my $oidbin = $lms ? $lms->name_oidbin($folder, $bn) : undef; - my @docids = defined($oidbin) ? - $self->{over}->oidbin_exists($oidbin) : (); + my @docids = defined($oidbin) ? $lse->over->oidbin_exists($oidbin) : (); my $vmd = $self->{-import_kw} ? { kw => $kw } : undef; if (scalar @docids) { - $self->{lse}->kw_changed(undef, $kw, \@docids) or return; + $lse->kw_changed(undef, $kw, \@docids) or return; } if (my $eml = eml_from_path($f)) { $vmd->{sync_info} = [ $folder, \$bn ] if $self->{-mail_sync}; diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm index aa9ce713..760f276c 100644 --- a/lib/PublicInbox/LeiPmdir.pm +++ b/lib/PublicInbox/LeiPmdir.pm @@ -30,15 +30,9 @@ sub new { sub ipc_atfork_child { my ($self) = @_; - my $lei = $self->{lei}; - $lei->_lei_atfork_child; my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}'; - $ipt->{lei} = $lei; - $ipt->{sto} = $lei->{sto} // die 'BUG: no lei->{sto}'; - $ipt->{lse} = $ipt->{sto}->search; - $ipt->{over} = $ipt->{lse}->over; - $ipt->{-lms_ro} //= $ipt->{lse}->lms; # may be undef or '0' - $self->SUPER::ipc_atfork_child; + $ipt->{lei} = $self->{lei}; + $ipt->ipc_atfork_child; } sub each_mdir_fn { # maildir_each_file callback diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm index 4b3ce7d8..e0532653 100644 --- a/lib/PublicInbox/LeiTag.pm +++ b/lib/PublicInbox/LeiTag.pm @@ -6,6 +6,7 @@ package PublicInbox::LeiTag; use strict; use v5.10.1; use parent qw(PublicInbox::IPC PublicInbox::LeiInput); +use PublicInbox::InboxWritable qw(eml_from_path); sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh my ($self, $eml) = @_; @@ -24,8 +25,9 @@ sub input_mbox_cb { input_eml_cb($self, $eml); } -sub input_maildir_cb { # maildir_each_eml cb - my ($f, $kw, $eml, $self) = @_; +sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn + my ($self, $f) = @_; + my $eml = eml_from_path($f) or return; input_eml_cb($self, $eml); } @@ -42,12 +44,12 @@ sub lei_tag { # the "lei tag" method $lei->ale; # refresh and prepare my $vmd_mod = $self->vmd_mod_extract(\@argv); return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err}; + $self->{vmd_mod} = $vmd_mod; # before LeiPmdir->new in prepare_inputs $self->prepare_inputs($lei, \@argv) or return; grep(defined, @$vmd_mod{qw(+kw +L -L -kw)}) or return $lei->fail('no keywords or labels specified'); my $ops = {}; $lei->{auth}->op_merge($ops, $self) if $lei->{auth}; - $self->{vmd_mod} = $vmd_mod; my $j = $self->{-wq_nr_workers} = 1; # locked for now (my $op_c, $ops) = $lei->workers_start($self, $j, $ops); $lei->{wq1} = $self;
We'll be reusing it in other commands, too. --- lib/PublicInbox/LeiExportKw.pm | 32 ++---------------------------- lib/PublicInbox/LeiMailSync.pm | 36 ++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/lib/PublicInbox/LeiExportKw.pm b/lib/PublicInbox/LeiExportKw.pm index f8579221..671a84df 100644 --- a/lib/PublicInbox/LeiExportKw.pm +++ b/lib/PublicInbox/LeiExportKw.pm @@ -94,36 +94,8 @@ EOM lei mail_sync uninitialized, see lei-import(1) EOM my $opt = $lei->{opt}; - my $all = $opt->{all}; - if (defined $all) { # --all=<local|remote> - my %x = map { $_ => $_ } split(/,/, $all); - my @ok = grep(defined, delete(@x{qw(local remote), ''})); - my @no = keys %x; - if (@no) { - @no = (join(',', @no)); - return $lei->fail(<<EOM); ---all=@no not accepted (must be `local' and/or `remote') -EOM - } - my (%seen, @inc); - my @all = $lms->folders; - for my $ok (@ok) { - if ($ok eq 'local') { - @inc = grep(!m!\A[a-z0-9\+]+://!i, @all); - } elsif ($ok eq 'remote') { - @inc = grep(m!\A[a-z0-9\+]+://!i, @all); - } elsif ($ok ne '') { - return $lei->fail("--all=$all not understood"); - } else { - @inc = @all; - } - for (@inc) { - push(@folders, $_) unless $seen{$_}++; - } - } - return $lei->fail(<<EOM) if !@folders; -no --mail-sync folders known to lei -EOM + if (defined(my $all = $opt->{all})) { # --all=<local|remote> + $lms->group2folders($lei, $all, \@folders) or return; } else { my $err = $lms->arg2folder($lei, \@folders); $lei->qerr(@{$err->{qerr}}) if $err->{qerr}; diff --git a/lib/PublicInbox/LeiMailSync.pm b/lib/PublicInbox/LeiMailSync.pm index ec05404a..af47439a 100644 --- a/lib/PublicInbox/LeiMailSync.pm +++ b/lib/PublicInbox/LeiMailSync.pm @@ -315,6 +315,42 @@ sub match_imap_url { "E: `$url' is ambiguous:\n\t".join("\n\t", @match)."\n"; } +# returns undef on failure, number on success +sub group2folders { + my ($self, $lei, $all, $folders) = @_; + return $lei->fail(<<EOM) if @$folders; +--all= not compatible with @$folders on command-line +EOM + my %x = map { $_ => $_ } split(/,/, $all); + my @ok = grep(defined, delete(@x{qw(local remote), ''})); + my @no = keys %x; + if (@no) { + @no = (join(',', @no)); + return $lei->fail(<<EOM); +--all=@no not accepted (must be `local' and/or `remote') +EOM + } + my (%seen, @inc); + my @all = $self->folders; + for my $ok (@ok) { + if ($ok eq 'local') { + @inc = grep(!m!\A[a-z0-9\+]+://!i, @all); + } elsif ($ok eq 'remote') { + @inc = grep(m!\A[a-z0-9\+]+://!i, @all); + } elsif ($ok ne '') { + return $lei->fail("--all=$all not understood"); + } else { + @inc = @all; + } + for (@inc) { + push(@$folders, $_) unless $seen{$_}++; + } + } + scalar(@$folders) || $lei->fail(<<EOM); +no --mail-sync folders known to lei +EOM +} + # map CLI args to folder table entries, returns undef on failure sub arg2folder { my ($self, $lei, $folders) = @_;
This will be invoked automatically by "lei import" eventually, but it may make sense to expose as a separate command. --- MANIFEST | 1 + lib/PublicInbox/LEI.pm | 2 + lib/PublicInbox/LeiPruneMailSync.pm | 97 +++++++++++++++++++++++++++++ lib/PublicInbox/NetReader.pm | 19 ++++++ lib/PublicInbox/NetWriter.pm | 21 +------ 5 files changed, 121 insertions(+), 19 deletions(-) create mode 100644 lib/PublicInbox/LeiPruneMailSync.pm diff --git a/MANIFEST b/MANIFEST index 7bdbf252..3d4c6cbd 100644 --- a/MANIFEST +++ b/MANIFEST @@ -222,6 +222,7 @@ lib/PublicInbox/LeiMirror.pm lib/PublicInbox/LeiOverview.pm lib/PublicInbox/LeiP2q.pm lib/PublicInbox/LeiPmdir.pm +lib/PublicInbox/LeiPruneMailSync.pm lib/PublicInbox/LeiQuery.pm lib/PublicInbox/LeiRediff.pm lib/PublicInbox/LeiRemote.pm diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 77fc5b8f..265b7047 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -245,6 +245,8 @@ our %CMD = ( # sorted in order of importance/use: qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt(), @c_opt ], 'forget-mail-sync' => [ 'LOCATION...', 'forget sync information for a mail folder', @c_opt ], +'prune-mail-sync' => [ 'LOCATION...|--all', + 'prune dangling sync data for a mail folder', 'all:s', @c_opt ], 'export-kw' => [ 'LOCATION...|--all', 'one-time export of keywords of sync sources', qw(all:s mode=s), @c_opt ], diff --git a/lib/PublicInbox/LeiPruneMailSync.pm b/lib/PublicInbox/LeiPruneMailSync.pm new file mode 100644 index 00000000..79f3325d --- /dev/null +++ b/lib/PublicInbox/LeiPruneMailSync.pm @@ -0,0 +1,97 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# "lei prune-mail-sync" drops dangling sync information +package PublicInbox::LeiPruneMailSync; +use strict; +use v5.10.1; +use parent qw(PublicInbox::IPC PublicInbox::LeiInput); +use PublicInbox::LeiExportKw; +use PublicInbox::InboxWritable qw(eml_from_path); + +sub eml_match ($$) { + my ($eml, $oidbin) = @_; + $oidbin eq git_sha(length($oidbin) == 20 ? 1 : 256, $eml)->digest; +} + +sub prune_mdir { # lms->each_src callback + my ($oidbin, $id, $self, $mdir) = @_; + my @try = $$id =~ /:2,[a-zA-Z]*\z/ ? qw(cur new) : qw(new cur); + for my $d (@try) { + my $src = "$mdir/$d/$$id"; + if ($self->{verify}) { + my $eml = eml_from_path($src) or next; + return if eml_match($eml, $oidbin); + } elsif (-f $src) { + return; + } + } + # both tries failed + $self->{lei}->qerr("# maildir:$mdir $$id gone"); + $self->{lei}->{sto}->ipc_do('lms_clear_src', "maildir:$mdir", $id); +} + +sub prune_imap { # lms->each_src callback + my ($oidbin, $uid, $self, $uids, $url) = @_; + return if exists $uids->{$uid}; + $self->{lei}->qerr("# $url $uid gone"); + $self->{lei}->{sto}->ipc_do('lms_clear_src', $url, $uid); +} + +sub input_path_url { # overrides PublicInbox::LeiInput::input_path_url + my ($self, $input, @args) = @_; + my $lms = $self->{-lms_ro} //= $self->{lse}->lms; + if ($input =~ /\Amaildir:(.+)/i) { + my $mdir = $1; + $lms->each_src($input, \&prune_mdir, $self, $mdir); + } elsif ($input =~ m!\Aimaps?://!i) { + my $uri = PublicInbox::URIimap->new($input); + my $mic = $self->{lei}->{net}->mic_for_folder($uri); + my $uids = $mic->search('UID 1:*'); + $uids = +{ map { $_ => undef } @$uids }; + $lms->each_src($$uri, \&prune_imap, $self, $uids, $$uri); + } else { die "BUG: $input not supported" } + my $wait = $self->{lei}->{sto}->ipc_do('done'); +} + +sub lei_prune_mail_sync { + my ($lei, @folders) = @_; + my $sto = $lei->_lei_store or return $lei->fail(<<EOM); +lei/store uninitialized, see lei-import(1) +EOM + my $lse = $sto->search; + my $lms = $lse->lms or return $lei->fail(<<EOM); +lei mail_sync uninitialized, see lei-import(1) +EOM + if (defined(my $all = $lei->{opt}->{all})) { + $lms->group2folders($lei, $all, \@folders) or return; + } else { + my $err = $lms->arg2folder($lei, \@folders); + $lei->qerr(@{$err->{qerr}}) if $err->{qerr}; + return $lei->fail($err->{fail}) if $err->{fail}; + } + delete $lms->{dbh}; + $sto->write_prepare($lei); + my $self = bless { lse => $lse }, __PACKAGE__; + $lei->{opt}->{'mail-sync'} = 1; # for prepare_inputs + $self->prepare_inputs($lei, \@folders) or return; + my $j = $lei->{opt}->{jobs} || scalar(@{$self->{inputs}}) || 1; + undef $lms; # for fork + my $ops = {}; + $sto->write_prepare($lei); + $lei->{auth}->op_merge($ops, $self) if $lei->{auth}; + $self->{-wq_nr_workers} = $j // 1; # locked + (my $op_c, $ops) = $lei->workers_start($self, $j, $ops); + $lei->{wq1} = $self; + $lei->{-err_type} = 'non-fatal'; + net_merge_all_done($self) unless $lei->{auth}; + $lei->wait_wq_events($op_c, $ops); # net_merge_all_done if !{auth} +} + +no warnings 'once'; +*_complete_prune_mail_sync = \&PublicInbox::LeiExportKw::_complete_export_kw; +*ipc_atfork_child = \&PublicInbox::LeiInput::input_only_atfork_child; +*net_merge_all = \&PublicInbox::LeiAuth::net_merge_all; +*net_merge_all_done = \&PublicInbox::LeiInput::input_only_net_merge_all_done; + +1; diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm index 058f4313..2795a9d4 100644 --- a/lib/PublicInbox/NetReader.pm +++ b/lib/PublicInbox/NetReader.pm @@ -747,4 +747,23 @@ sub nntp_each { sub new { bless {}, shift }; +# updates $uri with UIDVALIDITY +sub mic_for_folder { + my ($self, $uri) = @_; + my $mic = $self->mic_get($uri) or die "E: not connected: $@"; + my $m = $self->isa('PublicInbox::NetWriter') ? 'select' : 'examine'; + $mic->$m($uri->mailbox) or return; + my $uidval; + for ($mic->Results) { + /^\* OK \[UIDVALIDITY ([0-9]+)\].*/ or next; + $uidval = $1; + last; + } + $uidval //= $mic->uidvalidity($uri->mailbox) or + die "E: failed to get uidvalidity from <$uri>: $@"; + $uri->uidvalidity($uidval); + $mic; +} + + 1; diff --git a/lib/PublicInbox/NetWriter.pm b/lib/PublicInbox/NetWriter.pm index 8ec7f85c..82288e6b 100644 --- a/lib/PublicInbox/NetWriter.pm +++ b/lib/PublicInbox/NetWriter.pm @@ -26,26 +26,9 @@ sub imap_append { die "APPEND $folder: $@"; } -# updates $uri with UIDVALIDITY -sub mic_for_folder { - my ($self, $uri) = @_; - my $mic = $self->mic_get($uri) or die "E: not connected: $@"; - $mic->select($uri->mailbox) or return; - my $uidval; - for ($mic->Results) { - /^\* OK \[UIDVALIDITY ([0-9]+)\].*/ or next; - $uidval = $1; - last; - } - $uidval //= $mic->uidvalidity($uri->mailbox) or - die "E: failed to get uidvalidity from <$uri>: $@"; - $uri->uidvalidity($uidval); - $mic; -} - sub imap_delete_all { my ($self, $uri) = @_; - my $mic = mic_for_folder($self, $uri) or return; + my $mic = $self->mic_for_folder($uri) or return; my $sec = $self->can('uri_section')->($uri); local $0 = $uri->mailbox." $sec"; if ($mic->delete_message('1:*')) { @@ -55,7 +38,7 @@ sub imap_delete_all { sub imap_delete_1 { my ($self, $uri, $uid, $delete_mic) = @_; - $$delete_mic //= mic_for_folder($self, $uri) or return; + $$delete_mic //= $self->mic_for_folder($uri) or return; $$delete_mic->delete_message($uid); }