IMAP write support for search results is planned, but testing could get tricky... Still unsure about some UI bits w.r.t --format/-f: https://public-inbox.org/meta/20210217044032.GA17934@dcvr/ convert and import should support parallel network xfers, NNTP reads, and eventually JMAP... convert and import don't support compressed mboxes, yet. Eric Wong (11): lei: bless config watch: move imap_common_init to NetReader watch: connect to NNTP and IMAP in config order lei import: start rearranging code for IMAP support lei import: move check_input_format to lei tests: setup_public_inboxes: use IMAP-friendly newsgroups t/lei_to_mail: remove unnecessary arg passing lei convert: mail format conversion sub-command lei import: add IMAP, (maildir|mbox*):$PATHNAME support lei: consolidate the bulk of the IPC code lei: check for IMAP auth errors MANIFEST | 11 +- lib/PublicInbox/GitCredential.pm | 18 ++- lib/PublicInbox/LEI.pm | 62 +++++++- lib/PublicInbox/LeiAuth.pm | 70 +++++++++ lib/PublicInbox/LeiConvert.pm | 137 +++++++++++++++++ lib/PublicInbox/LeiDedupe.pm | 2 +- lib/PublicInbox/LeiImport.pm | 156 +++++++++++++------- lib/PublicInbox/LeiMirror.pm | 19 +-- lib/PublicInbox/LeiOverview.pm | 7 +- lib/PublicInbox/LeiToMail.pm | 5 +- lib/PublicInbox/MdirReader.pm | 26 ++++ lib/PublicInbox/NetReader.pm | 242 +++++++++++++++++++++++++++++-- lib/PublicInbox/TestCommon.pm | 15 +- lib/PublicInbox/Watch.pm | 82 ++--------- t/{home1 => home2}/.gitignore | 0 t/{home1 => home2}/Makefile | 0 t/{home1 => home2}/README | 0 t/lei-convert.t | 36 +++++ t/lei-import-imap.t | 28 ++++ t/lei-import-maildir.t | 4 +- t/lei_to_mail.t | 14 +- t/net_reader-imap.t | 40 +++++ xt/lei-auth-fail.t | 20 +++ 23 files changed, 820 insertions(+), 174 deletions(-) create mode 100644 lib/PublicInbox/LeiAuth.pm create mode 100644 lib/PublicInbox/LeiConvert.pm rename t/{home1 => home2}/.gitignore (100%) rename t/{home1 => home2}/Makefile (100%) rename t/{home1 => home2}/README (100%) create mode 100644 t/lei-convert.t create mode 100644 t/lei-import-imap.t create mode 100644 t/net_reader-imap.t create mode 100644 xt/lei-auth-fail.t
We'll be needing ->url_match from PublicInbox::Config --- lib/PublicInbox/LEI.pm | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index aa14ca6f..12deedd8 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -563,6 +563,7 @@ sub _lei_cfg ($;$) { qerr($self, "# $f created") if $self->{cmd} ne 'config'; } my $cfg = PublicInbox::Config::git_config_dump($f); + bless $cfg, 'PublicInbox::Config'; $cfg->{-st} = $cur_st; $cfg->{'-f'} = $f; $self->{cfg} = $PATH2CFG{$f} = $cfg;
We'll use this in LeiImport and likely other places. --- lib/PublicInbox/NetReader.pm | 65 +++++++++++++++++++++++++++++++++++- lib/PublicInbox/Watch.pm | 65 +----------------------------------- 2 files changed, 65 insertions(+), 65 deletions(-) diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm index 79047fd2..8c919f66 100644 --- a/lib/PublicInbox/NetReader.pm +++ b/lib/PublicInbox/NetReader.pm @@ -10,7 +10,9 @@ use parent qw(Exporter); # TODO: trim this down, this is huge our @EXPORT = qw(uri_new uri_scheme uri_section mic_for nn_new nn_for - imap_url nntp_url); + imap_url nntp_url + cfg_bool cfg_intvl imap_common_init + ); # avoid exposing deprecated "snews" to users. my %SCHEME_MAP = ('snews' => 'nntps'); @@ -217,4 +219,65 @@ sub nntp_url { $url; } +sub cfg_intvl ($$$) { + my ($cfg, $key, $url) = @_; + my $v = $cfg->urlmatch($key, $url) // return; + $v =~ /\A[0-9]+(?:\.[0-9]+)?\z/s and return $v + 0; + if (ref($v) eq 'ARRAY') { + $v = join(', ', @$v); + warn "W: $key has multiple values: $v\nW: $key ignored\n"; + } else { + warn "W: $key=$v is not a numeric value in seconds\n"; + } +} + +sub cfg_bool ($$$) { + my ($cfg, $key, $url) = @_; + my $orig = $cfg->urlmatch($key, $url) // return; + my $bool = $cfg->git_bool($orig); + warn "W: $key=$orig for $url is not boolean\n" unless defined($bool); + $bool; +} + +# flesh out common IMAP-specific data structures +sub imap_common_init ($) { + my ($self) = @_; + eval { require PublicInbox::IMAPClient } or + die "Mail::IMAPClient is required for IMAP:\n$@\n"; + eval { require PublicInbox::IMAPTracker } or + die "DBD::SQLite is required for IMAP\n:$@\n"; + require PublicInbox::URIimap; + my $cfg = $self->{pi_cfg}; + my $mic_args = {}; # scheme://authority => Mail:IMAPClient arg + for my $url (sort keys %{$self->{imap}}) { + my $uri = PublicInbox::URIimap->new($url); + my $sec = uri_section($uri); + for my $k (qw(Starttls Debug Compress)) { + my $bool = cfg_bool($cfg, "imap.$k", $url) // next; + $mic_args->{$sec}->{$k} = $bool; + } + my $to = cfg_intvl($cfg, 'imap.timeout', $url); + $mic_args->{$sec}->{Timeout} = $to if $to; + for my $k (qw(pollInterval idleInterval)) { + $to = cfg_intvl($cfg, "imap.$k", $url) // next; + $self->{imap_opt}->{$sec}->{$k} = $to; + } + my $k = 'imap.fetchBatchSize'; + my $bs = $cfg->urlmatch($k, $url) // next; + if ($bs =~ /\A([0-9]+)\z/) { + $self->{imap_opt}->{$sec}->{batch_size} = $bs; + } else { + warn "$k=$bs is not an integer\n"; + } + } + # make sure we can connect and cache the credentials in memory + $self->{mic_arg} = {}; # schema://authority => IMAPClient->new args + my $mics = {}; # schema://authority => IMAPClient obj + for my $url (sort keys %{$self->{imap}}) { + my $uri = PublicInbox::URIimap->new($url); + $mics->{uri_section($uri)} //= mic_for($self, $url, $mic_args); + } + $mics; +} + 1; diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm index 8a457b81..6b6be44c 100644 --- a/lib/PublicInbox/Watch.pm +++ b/lib/PublicInbox/Watch.pm @@ -280,55 +280,6 @@ sub watch_fs_init ($) { PublicInbox::DirIdle->new([keys %{$self->{mdmap}}], $cb); } -sub cfg_intvl ($$$) { - my ($cfg, $key, $url) = @_; - my $v = $cfg->urlmatch($key, $url) // return; - $v =~ /\A[0-9]+(?:\.[0-9]+)?\z/s and return $v + 0; - if (ref($v) eq 'ARRAY') { - $v = join(', ', @$v); - warn "W: $key has multiple values: $v\nW: $key ignored\n"; - } else { - warn "W: $key=$v is not a numeric value in seconds\n"; - } -} - -sub cfg_bool ($$$) { - my ($cfg, $key, $url) = @_; - my $orig = $cfg->urlmatch($key, $url) // return; - my $bool = $cfg->git_bool($orig); - warn "W: $key=$orig for $url is not boolean\n" unless defined($bool); - $bool; -} - -# flesh out common IMAP-specific data structures -sub imap_common_init ($) { - my ($self) = @_; - my $cfg = $self->{pi_cfg}; - my $mic_args = {}; # scheme://authority => Mail:IMAPClient arg - for my $url (sort keys %{$self->{imap}}) { - my $uri = PublicInbox::URIimap->new($url); - my $sec = uri_section($uri); - for my $k (qw(Starttls Debug Compress)) { - my $bool = cfg_bool($cfg, "imap.$k", $url) // next; - $mic_args->{$sec}->{$k} = $bool; - } - my $to = cfg_intvl($cfg, 'imap.timeout', $url); - $mic_args->{$sec}->{Timeout} = $to if $to; - for my $k (qw(pollInterval idleInterval)) { - $to = cfg_intvl($cfg, "imap.$k", $url) // next; - $self->{imap_opt}->{$sec}->{$k} = $to; - } - my $k = 'imap.fetchBatchSize'; - my $bs = $cfg->urlmatch($k, $url) // next; - if ($bs =~ /\A([0-9]+)\z/) { - $self->{imap_opt}->{$sec}->{batch_size} = $bs; - } else { - warn "$k=$bs is not an integer\n"; - } - } - $mic_args; -} - sub imap_import_msg ($$$$$) { my ($self, $url, $uid, $raw, $flags) = @_; # our target audience expects LF-only, save storage @@ -667,21 +618,7 @@ sub poll_fetch_reap { sub watch_imap_init ($$) { my ($self, $poll) = @_; - eval { require PublicInbox::IMAPClient } or - die "Mail::IMAPClient is required for IMAP:\n$@\n"; - eval { require PublicInbox::IMAPTracker } or - die "DBD::SQLite is required for IMAP\n:$@\n"; - - my $mic_args = imap_common_init($self); # read args from config - - # make sure we can connect and cache the credentials in memory - $self->{mic_arg} = {}; # schema://authority => IMAPClient->new args - my $mics = {}; # schema://authority => IMAPClient obj - for my $url (sort keys %{$self->{imap}}) { - my $uri = PublicInbox::URIimap->new($url); - $mics->{uri_section($uri)} //= mic_for($self, $url, $mic_args); - } - + my $mics = imap_common_init($self); # read args from config my $idle = []; # [ [ url1, intvl1 ], [url2, intvl2] ] for my $url (keys %{$self->{imap}}) { my $uri = PublicInbox::URIimap->new($url);
This is hopefully less surprising to users when they're prompted for credentials. --- lib/PublicInbox/NetReader.pm | 4 ++-- lib/PublicInbox/Watch.pm | 17 ++++++++++++----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm index 8c919f66..fa337bcd 100644 --- a/lib/PublicInbox/NetReader.pm +++ b/lib/PublicInbox/NetReader.pm @@ -249,7 +249,7 @@ sub imap_common_init ($) { require PublicInbox::URIimap; my $cfg = $self->{pi_cfg}; my $mic_args = {}; # scheme://authority => Mail:IMAPClient arg - for my $url (sort keys %{$self->{imap}}) { + for my $url (@{$self->{imap_order}}) { my $uri = PublicInbox::URIimap->new($url); my $sec = uri_section($uri); for my $k (qw(Starttls Debug Compress)) { @@ -273,7 +273,7 @@ sub imap_common_init ($) { # make sure we can connect and cache the credentials in memory $self->{mic_arg} = {}; # schema://authority => IMAPClient->new args my $mics = {}; # schema://authority => IMAPClient obj - for my $url (sort keys %{$self->{imap}}) { + for my $url (@{$self->{imap_order}}) { my $uri = PublicInbox::URIimap->new($url); $mics->{uri_section($uri)} //= mic_for($self, $url, $mic_args); } diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm index 6b6be44c..c64689a1 100644 --- a/lib/PublicInbox/Watch.pm +++ b/lib/PublicInbox/Watch.pm @@ -46,6 +46,7 @@ sub new { my ($class, $cfg) = @_; my (%mdmap, $spamc); my (%imap, %nntp); # url => [inbox objects] or 'watchspam' + my (@imap, @nntp); # "publicinboxwatch" is the documented namespace # "publicinboxlearn" is legacy but may be supported @@ -61,8 +62,10 @@ sub new { $mdmap{"$dir/cur"} = 'watchspam'; } elsif ($url = imap_url($dir)) { $imap{$url} = 'watchspam'; + push @imap, $url; } elsif ($url = nntp_url($dir)) { $nntp{$url} = 'watchspam'; + push @nntp, $url; } else { warn "unsupported $k=$dir\n"; } @@ -92,11 +95,13 @@ sub new { } elsif ($url = imap_url($watch)) { return if is_watchspam($url, $imap{$url}, $ibx); compile_watchheaders($ibx); - push @{$imap{$url} ||= []}, $ibx; + my $n = push @{$imap{$url} ||= []}, $ibx; + push @imap, $url if $n == 1; } elsif ($url = nntp_url($watch)) { return if is_watchspam($url, $nntp{$url}, $ibx); compile_watchheaders($ibx); - push @{$nntp{$url} ||= []}, $ibx; + my $n = push @{$nntp{$url} ||= []}, $ibx; + push @nntp, $url if $n == 1; } else { warn "watch unsupported: $k=$watch\n"; } @@ -118,6 +123,8 @@ sub new { pi_cfg => $cfg, imap => scalar keys %imap ? \%imap : undef, nntp => scalar keys %nntp? \%nntp : undef, + imap_order => scalar(@imap) ? \@imap : undef, + nntp_order => scalar(@nntp) ? \@nntp: undef, importers => {}, opendirs => {}, # dirname => dirhandle (in progress scans) ops => [], # 'quit', 'full' @@ -643,7 +650,7 @@ sub nntp_common_init ($) { my ($self) = @_; my $cfg = $self->{pi_cfg}; my $nn_args = {}; # scheme://authority => Net::NNTP->new arg - for my $url (sort keys %{$self->{nntp}}) { + for my $url (@{$self->{nntp_order}}) { my $sec = uri_section(uri_new($url)); # Debug and Timeout are passed to Net::NNTP->new @@ -755,10 +762,10 @@ sub watch_nntp_init ($$) { # make sure we can connect and cache the credentials in memory $self->{nn_arg} = {}; # schema://authority => Net::NNTP->new args - for my $url (sort keys %{$self->{nntp}}) { + for my $url (@{$self->{nntp_order}}) { nn_for($self, $url, $nn_args); } - for my $url (keys %{$self->{nntp}}) { + for my $url (@{$self->{nntp_order}}) { my $uri = uri_new($url); my $sec = uri_section($uri); my $intvl = $self->{nntp_opt}->{$sec}->{pollInterval};
More to come in a later commit; some error handling and failure modes will be trickier with IMAP due to authentication. --- lib/PublicInbox/LeiImport.pm | 74 +++++++++++++++++++++++++----------- lib/PublicInbox/NetReader.pm | 19 +++++++++ 2 files changed, 71 insertions(+), 22 deletions(-) diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 8358d9d4..b25d7e97 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -29,26 +29,21 @@ sub import_done { # EOF callback for main daemon $imp->wq_wait_old(\&import_done_wait, $lei); } -sub call { # the main "lei import" method - my ($cls, $lei, @argv) = @_; - my $sto = $lei->_lei_store(1); - $sto->write_prepare($lei); - $lei->{opt}->{kw} //= 1; +sub check_fmt ($;$) { + my ($lei, $f) = @_; my $fmt = $lei->{opt}->{'format'}; - my $self = $lei->{imp} = bless {}, $cls; - my @f; - for my $x (@argv) { - if (-f $x) { push @f, $x } - elsif (-d _) { require PublicInbox::MdirReader } - } - (@f && !$fmt) and - return $lei->fail("--format unset for regular file(s):\n@f"); - if (@f && $fmt ne 'eml') { - require PublicInbox::MboxReader; - PublicInbox::MboxReader->can($fmt) or - return $lei->fail( "--format=$fmt unrecognized\n"); + if (!$fmt) { + my $err = $f ? "regular file(s):\n@$f" : '--stdin'; + return $lei->fail("--format unset for $err"); } - $self->{0} = $lei->{0} if $lei->{opt}->{stdin}; + return 1 if $fmt eq 'eml'; + require PublicInbox::MboxReader; + PublicInbox::MboxReader->can($fmt) || + $lei->fail( "--format=$fmt unrecognized\n"); +} + +sub do_import { + my ($lei) = @_; my $ops = { '!' => [ $lei->can('fail_handler'), $lei ], 'x_it' => [ $lei->can('x_it'), $lei ], @@ -56,14 +51,19 @@ sub call { # the main "lei import" method '' => [ \&import_done, $lei ], }; ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); - my $j = $lei->{opt}->{jobs} // scalar(@argv) || 1; - my $nproc = $self->detect_nproc; - $j = $nproc if $j > $nproc; + my $self = $lei->{imp}; + my $j = $lei->{opt}->{jobs} // scalar(@{$self->{argv}}) || 1; + if (my $nrd = $lei->{nrd}) { + # $j = $nrd->net_concurrency($j); TODO + } else { + my $nproc = $self->detect_nproc; + $j = $nproc if $j > $nproc; + } $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei}); my $op = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; $self->wq_io_do('import_stdin', []) if $self->{0}; - for my $x (@argv) { + for my $x (@{$self->{argv}}) { $self->wq_io_do('import_path_url', [], $x); } $self->wq_close(1); @@ -73,6 +73,36 @@ sub call { # the main "lei import" method } } +sub call { # the main "lei import" method + my ($cls, $lei, @argv) = @_; + my $sto = $lei->_lei_store(1); + $sto->write_prepare($lei); + $lei->{opt}->{kw} //= 1; + my $self = $lei->{imp} = bless { argv => \@argv }, $cls; + if ($lei->{opt}->{stdin}) { + @argv and return + $lei->fail("--stdin and locations (@argv) do not mix"); + check_fmt($lei) or return; + $self->{0} = $lei->{0}; + } else { + my @f; + for my $x (@argv) { + if (-f $x) { push @f, $x } + elsif (-d _) { require PublicInbox::MdirReader } + else { + require PublicInbox::NetReader; + $lei->{nrd} //= PublicInbox::NetReader->new; + $lei->{nrd}->add_url($x); + } + } + if (@f) { check_fmt($lei, \@f) or return } + if ($lei->{nrd} && (my @err = $lei->{nrd}->errors)) { + return $lei->fail(@err); + } + } + do_import($lei); +} + sub ipc_atfork_child { my ($self) = @_; $self->{lei}->lei_atfork_child; diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm index fa337bcd..1d053425 100644 --- a/lib/PublicInbox/NetReader.pm +++ b/lib/PublicInbox/NetReader.pm @@ -280,4 +280,23 @@ sub imap_common_init ($) { $mics; } +sub add_url { + my ($self, $arg) = @_; + if (my $url = imap_url($arg)) { + push @{$self->{imap_order}}, $url; + } else { + push @{$self->{unsupported_url}}, $arg; + } +} + +sub errors { + my ($self) = @_; + if (my $u = $self->{unsupported_url}) { + return "Unsupported URL(s): @$u"; + } + undef; +} + +sub new { bless {}, shift }; + 1;
We'll be supporting "lei convert" in a future change; so it makes sense to share a common internal API for common error messages. --- lib/PublicInbox/LEI.pm | 14 ++++++++++++++ lib/PublicInbox/LeiImport.pm | 17 ++--------------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 12deedd8..1fa9f751 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -391,6 +391,20 @@ sub fail ($$;$) { undef; } +sub check_input_format ($;$) { + my ($self, $files) = @_; + my $fmt = $self->{opt}->{'format'}; + if (!$fmt) { + my $err = $files ? "regular file(s):\n@$files" : '--stdin'; + return fail($self, "--format unset for $err"); + } + return 1 if $fmt eq 'eml'; + # XXX: should this handle {gz,bz2,xz}? that's currently in LeiToMail + require PublicInbox::MboxReader; + PublicInbox::MboxReader->can($fmt) || + fail($self, "--format=$fmt unrecognized"); +} + sub out ($;@) { my $self = shift; return if print { $self->{1} // return } @_; # likely diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index b25d7e97..32f3a467 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -29,19 +29,6 @@ sub import_done { # EOF callback for main daemon $imp->wq_wait_old(\&import_done_wait, $lei); } -sub check_fmt ($;$) { - my ($lei, $f) = @_; - my $fmt = $lei->{opt}->{'format'}; - if (!$fmt) { - my $err = $f ? "regular file(s):\n@$f" : '--stdin'; - return $lei->fail("--format unset for $err"); - } - return 1 if $fmt eq 'eml'; - require PublicInbox::MboxReader; - PublicInbox::MboxReader->can($fmt) || - $lei->fail( "--format=$fmt unrecognized\n"); -} - sub do_import { my ($lei) = @_; my $ops = { @@ -82,7 +69,7 @@ sub call { # the main "lei import" method if ($lei->{opt}->{stdin}) { @argv and return $lei->fail("--stdin and locations (@argv) do not mix"); - check_fmt($lei) or return; + $lei->check_input_format or return; $self->{0} = $lei->{0}; } else { my @f; @@ -95,7 +82,7 @@ sub call { # the main "lei import" method $lei->{nrd}->add_url($x); } } - if (@f) { check_fmt($lei, \@f) or return } + if (@f) { $lei->check_input_format(\@f) or return } if ($lei->{nrd} && (my @err = $lei->{nrd}->errors)) { return $lei->fail(@err); }
-imapd won't support newsgroups ending with /\.[0-9]+\z/ since it reserves those for partitioning inboxes into 50K slices. So bump the home[0-9]+ version and switch to IMAP-friendly newsgroup names. --- MANIFEST | 6 +++--- lib/PublicInbox/TestCommon.pm | 4 ++-- t/{home1 => home2}/.gitignore | 0 t/{home1 => home2}/Makefile | 0 t/{home1 => home2}/README | 0 5 files changed, 5 insertions(+), 5 deletions(-) rename t/{home1 => home2}/.gitignore (100%) rename t/{home1 => home2}/Makefile (100%) rename t/{home1 => home2}/README (100%) diff --git a/MANIFEST b/MANIFEST index 1794d930..82068900 100644 --- a/MANIFEST +++ b/MANIFEST @@ -334,9 +334,9 @@ t/git.fast-import-data t/git.t t/gzip_filter.t t/hl_mod.t -t/home1/.gitignore -t/home1/Makefile -t/home1/README +t/home2/.gitignore +t/home2/Makefile +t/home2/README t/html_index.t t/httpd-corner.psgi t/httpd-corner.t diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm index d6b7d20e..c5070cfd 100644 --- a/lib/PublicInbox/TestCommon.pm +++ b/lib/PublicInbox/TestCommon.pm @@ -540,7 +540,7 @@ EOM # returns the pathname to a ~/.public-inbox/config in scalar context, # ($test_home, $pi_config_pathname) in list context sub setup_public_inboxes () { - my $test_home = "t/home1"; + my $test_home = "t/home2"; my $pi_config = "$test_home/.public-inbox/config"; my $stamp = "$test_home/setup-stamp"; my @ret = ($test_home, $pi_config); @@ -555,7 +555,7 @@ sub setup_public_inboxes () { local $ENV{PI_CONFIG} = $pi_config; for my $V (1, 2) { run_script([qw(-init), "-V$V", "t$V", - '--newsgroup', "t.$V", + '--newsgroup', "t.v$V", "$test_home/t$V", "http://example.com/t$V", "t$V\@example.com" ]) or BAIL_OUT "init v$V"; } diff --git a/t/home1/.gitignore b/t/home2/.gitignore similarity index 100% rename from t/home1/.gitignore rename to t/home2/.gitignore diff --git a/t/home1/Makefile b/t/home2/Makefile similarity index 100% rename from t/home1/Makefile rename to t/home2/Makefile diff --git a/t/home1/README b/t/home2/README similarity index 100% rename from t/home1/README rename to t/home2/README
{zpipe} is contained entirely within the $l2m object, now. --- t/lei_to_mail.t | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t index 77e9902e..6a571660 100644 --- a/t/lei_to_mail.t +++ b/t/lei_to_mail.t @@ -101,9 +101,9 @@ my $wcb_get = sub { my $dup = Storable::thaw(Storable::freeze($l2m)); is_deeply($dup, $l2m, "$fmt round-trips through storable"); } - my $zpipe = $l2m->pre_augment($lei); + $l2m->pre_augment($lei); $l2m->do_augment($lei); - $l2m->post_augment($lei, $zpipe); + $l2m->post_augment($lei); $l2m->write_cb($lei); };
This will make testing IMAP support for other commands easier, as it doesn't write to lei/store at all. Like the pager and MUA, "git credential" is always spawned by script/lei (and not lei-daemon) so it has a controlling terminal for password prompts. --- MANIFEST | 4 + lib/PublicInbox/GitCredential.pm | 18 ++-- lib/PublicInbox/LEI.pm | 38 +++++-- lib/PublicInbox/LeiAuth.pm | 80 +++++++++++++++ lib/PublicInbox/LeiConvert.pm | 149 ++++++++++++++++++++++++++++ lib/PublicInbox/LeiDedupe.pm | 2 +- lib/PublicInbox/LeiOverview.pm | 7 +- lib/PublicInbox/LeiToMail.pm | 5 +- lib/PublicInbox/MdirReader.pm | 26 +++++ lib/PublicInbox/NetReader.pm | 163 ++++++++++++++++++++++++++++--- lib/PublicInbox/TestCommon.pm | 11 ++- t/lei-convert.t | 36 +++++++ t/net_reader-imap.t | 40 ++++++++ 13 files changed, 542 insertions(+), 37 deletions(-) create mode 100644 lib/PublicInbox/LeiAuth.pm create mode 100644 lib/PublicInbox/LeiConvert.pm create mode 100644 t/lei-convert.t create mode 100644 t/net_reader-imap.t diff --git a/MANIFEST b/MANIFEST index 82068900..4f146771 100644 --- a/MANIFEST +++ b/MANIFEST @@ -178,6 +178,8 @@ lib/PublicInbox/InputPipe.pm lib/PublicInbox/Isearch.pm lib/PublicInbox/KQNotify.pm lib/PublicInbox/LEI.pm +lib/PublicInbox/LeiAuth.pm +lib/PublicInbox/LeiConvert.pm lib/PublicInbox/LeiCurl.pm lib/PublicInbox/LeiDedupe.pm lib/PublicInbox/LeiExternal.pm @@ -360,6 +362,7 @@ t/init.t t/ipc.t t/iso-2202-jp.eml t/kqnotify.t +t/lei-convert.t t/lei-daemon.t t/lei-externals.t t/lei-import-maildir.t @@ -388,6 +391,7 @@ t/msg_iter.t t/msgmap.t t/msgtime.t t/multi-mid.t +t/net_reader-imap.t t/nntp.t t/nntpd-tls.t t/nntpd-v2.t diff --git a/lib/PublicInbox/GitCredential.pm b/lib/PublicInbox/GitCredential.pm index 9e193029..2d81817c 100644 --- a/lib/PublicInbox/GitCredential.pm +++ b/lib/PublicInbox/GitCredential.pm @@ -4,11 +4,17 @@ package PublicInbox::GitCredential; use strict; use PublicInbox::Spawn qw(popen_rd); -sub run ($$) { - my ($self, $op) = @_; - my ($in_r, $in_w); +sub run ($$;$) { + my ($self, $op, $lei) = @_; + my ($in_r, $in_w, $out_r); + my $cmd = [ qw(git credential), $op ]; pipe($in_r, $in_w) or die "pipe: $!"; - my $out_r = popen_rd([qw(git credential), $op], undef, { 0 => $in_r }); + if ($lei && !$lei->{oneshot}) { # we'll die if disconnected: + pipe($out_r, my $out_w) or die "pipe: $!"; + $lei->send_exec_cmd([ $in_r, $out_w ], $cmd, {}); + } else { + $out_r = popen_rd($cmd, undef, { 0 => $in_r }); + } close $in_r or die "close in_r: $!"; my $out = ''; @@ -41,8 +47,8 @@ sub check_netrc ($) { } sub fill { - my ($self) = @_; - my $out_r = run($self, 'fill'); + my ($self, $lei) = @_; + my $out_r = run($self, 'fill', $lei); while (<$out_r>) { chomp; return if $_ eq ''; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 1fa9f751..1e4c36d0 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -173,7 +173,11 @@ our %CMD = ( # sorted in order of importance/use: qw(stdin| offset=i recursive|r exclude=s include|I=s format|f=s kw|keywords|flags!), ], - +'convert' => [ 'LOCATION...|--stdin', + 'one-time conversion from URL or filesystem to another format', + qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s quiet|q + kw|keywords|flags!), + ], 'config' => [ '[...]', sub { 'git-config(1) wrapper for '._config_path($_[0]); }, qw(config-file|system|global|file|f=s), # for conflict detection @@ -320,7 +324,7 @@ my %CONFIG_KEYS = ( 'leistore.dir' => 'top-level storage location', ); -my @WQ_KEYS = qw(lxs l2m imp mrr); # internal workers +my @WQ_KEYS = qw(lxs l2m imp mrr cnv auth); # internal workers # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE sub x_it ($$) { @@ -391,18 +395,19 @@ sub fail ($$;$) { undef; } -sub check_input_format ($;$) { - my ($self, $files) = @_; - my $fmt = $self->{opt}->{'format'}; +sub check_input_format ($;$$) { + my ($self, $files, $opt_key) = @_; + $opt_key //= 'format'; + my $fmt = $self->{opt}->{$opt_key}; if (!$fmt) { my $err = $files ? "regular file(s):\n@$files" : '--stdin'; - return fail($self, "--format unset for $err"); + return fail($self, "--$opt_key unset for $err"); } return 1 if $fmt eq 'eml'; # XXX: should this handle {gz,bz2,xz}? that's currently in LeiToMail require PublicInbox::MboxReader; PublicInbox::MboxReader->can($fmt) || - fail($self, "--format=$fmt unrecognized"); + fail($self, "--$opt_key=$fmt unrecognized"); } sub out ($;@) { @@ -445,6 +450,7 @@ sub lei_atfork_child { } else { delete $self->{0}; } + delete @$self{qw(cnv)}; for (delete @$self{qw(3 sock old_1 au_done)}) { close($_) if defined($_); } @@ -626,6 +632,11 @@ sub lei_import { PublicInbox::LeiImport->call(@_); } +sub lei_convert { + require PublicInbox::LeiConvert; + PublicInbox::LeiConvert->call(@_); +} + sub lei_init { my ($self, $dir) = @_; my $cfg = _lei_cfg($self, 1); @@ -770,6 +781,13 @@ sub start_mua { delete $self->{opt}->{verbose}; } +sub send_exec_cmd { # tell script/lei to execute a command + my ($self, $io, $cmd, $env) = @_; + my $sock = $self->{sock} // die 'lei client gone'; + my $fds = [ map { fileno($_) } @$io ]; + $send_cmd->($sock, $fds, exec_buf($cmd, $env), MSG_EOR); +} + sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail my ($self) = @_; my $alerts = $self->{opt}->{alert} // return; @@ -813,10 +831,9 @@ sub start_pager { pipe(my ($r, $wpager)) or return warn "pipe: $!"; my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} }; my $pgr = [ undef, @$rdr{1, 2} ]; - if (my $sock = $self->{sock}) { # lei(1) process runs it + if ($self->{sock}) { # lei(1) process runs it delete @$new_env{keys %$env}; # only set iff unset - my $fds = [ map { fileno($_) } @$rdr{0..2} ]; - $send_cmd->($sock, $fds, exec_buf([$pager], $new_env), MSG_EOR); + send_exec_cmd($self, [ @$rdr{0..2} ], [$pager], $new_env); } elsif ($self->{oneshot}) { my $cmd = [$pager]; $self->{"pid.$self.$$"}->{spawn($cmd, $new_env, $rdr)} = $cmd; @@ -920,6 +937,7 @@ sub event_step { sub event_step_init { my ($self) = @_; + return if $self->{-event_init_done}++; if (my $sock = $self->{sock}) { # using DS->EventLoop $self->SUPER::new($sock, EPOLLIN|EPOLLET); } diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm new file mode 100644 index 00000000..88310874 --- /dev/null +++ b/lib/PublicInbox/LeiAuth.pm @@ -0,0 +1,80 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# Authentication worker for anything that needs auth for read/write IMAP +# (eventually for read-only NNTP access) +package PublicInbox::LeiAuth; +use strict; +use v5.10.1; +use parent qw(PublicInbox::IPC); +use PublicInbox::PktOp qw(pkt_do); +use PublicInbox::NetReader; + +sub nrd_merge { + my ($lei, $nrd_new) = @_; + if ($lei->{pkt_op_p}) { # from lei_convert worker + pkt_do($lei->{pkt_op_p}, 'nrd_merge', $nrd_new); + } else { # single lei-daemon consumer + my $self = $lei->{auth} or return; # client disconnected + my $nrd = $self->{nrd}; + %$nrd = (%$nrd, %$nrd_new); + } +} + +sub do_auth { # called via wq_io_do + my ($self) = @_; + my ($lei, $nrd) = @$self{qw(lei nrd)}; + $nrd->imap_common_init($lei); + nrd_merge($lei, $nrd); # tell lei-daemon updated auth info +} + +sub do_finish_auth { # dwaitpid callback + my ($arg, $pid) = @_; + my ($self, $lei, $post_auth_cb, @args) = @$arg; + $? ? $lei->dclose : $post_auth_cb->(@args); +} + +sub auth_eof { + my ($lei, $post_auth_cb, @args) = @_; + my $self = delete $lei->{auth} or return; + $self->wq_wait_old(\&do_finish_auth, $lei, $post_auth_cb, @args); +} + +sub auth_start { + my ($self, $lei, $post_auth_cb, @args) = @_; + my $ops = { + '!' => [ $lei->can('fail_handler'), $lei ], + '|' => [ $lei->can('sigpipe_handler'), $lei ], + 'x_it' => [ $lei->can('x_it'), $lei ], + 'child_error' => [ $lei->can('child_error'), $lei ], + 'nrd_merge' => [ \&nrd_merge, $lei ], + '' => [ \&auth_eof, $lei, $post_auth_cb, @args ], + }; + ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); + $self->wq_workers_start('lei_auth', 1, $lei->oldset, {lei => $lei}); + my $op = delete $lei->{pkt_op_c}; + delete $lei->{pkt_op_p}; + $self->wq_io_do('do_auth', []); + $self->wq_close(1); + $lei->event_step_init; # wait for shutdowns + if ($lei->{oneshot}) { + while ($op->{sock}) { $op->event_step } + } +} + +sub ipc_atfork_child { + my ($self) = @_; + # prevent {sock} from being closed in lei_atfork_child: + my $s = delete $self->{lei}->{sock}; + delete $self->{lei}->{auth}; # drop circular ref + $self->{lei}->lei_atfork_child; + $self->{lei}->{sock} = $s if $s; + $self->SUPER::ipc_atfork_child; +} + +sub new { + my ($cls, $nrd) = @_; + bless { nrd => $nrd }, $cls; +} + +1; diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm new file mode 100644 index 00000000..44d5131b --- /dev/null +++ b/lib/PublicInbox/LeiConvert.pm @@ -0,0 +1,149 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# front-end for the "lei convert" sub-command +package PublicInbox::LeiConvert; +use strict; +use v5.10.1; +use parent qw(PublicInbox::IPC); +use PublicInbox::Eml; +use PublicInbox::InboxWritable qw(eml_from_path); +use PublicInbox::PktOp; +use PublicInbox::LeiStore; +use PublicInbox::LeiOverview; + +sub mbox_cb { + my ($eml, $self) = @_; + my @kw = PublicInbox::LeiStore::mbox_keywords($eml); + $eml->header_set($_) for qw(Status X-Status); + $self->{wcb}->(undef, { kw => \@kw }, $eml); +} + +sub imap_cb { # ->imap_each + my ($url, $uid, $kw, $eml, $self) = @_; + $self->{wcb}->(undef, { kw => $kw }, $eml); +} + +sub mdir_cb { + my ($kw, $eml, $self) = @_; + $self->{wcb}->(undef, { kw => $kw }, $eml); +} + +sub do_convert { # via wq_do + my ($self) = @_; + my $lei = $self->{lei}; + my $in_fmt = $lei->{opt}->{'in-format'}; + if (my $stdin = delete $self->{0}) { + PublicInbox::MboxReader->$in_fmt($stdin, \&mbox_cb, $self); + } + for my $input (@{$self->{inputs}}) { + my $ifmt = lc($in_fmt // ''); + if ($input =~ m!\A(?:imap|nntp)s?://!) { # TODO: nntp + $lei->{nrd}->imap_each($input, \&imap_cb, $self); + next; + } elsif ($input =~ s!\A([a-z0-9]+):!!i) { + $ifmt = lc $1; + } + if (-f $input) { + open my $fh, '<', $input or + return $lei->fail("open $input: $!"); + PublicInbox::MboxReader->$ifmt($fh, \&mbox_cb, $self); + } elsif (-d _) { + PublicInbox::MdirReader::maildir_each_eml($input, + \&mdir_cb, $self); + } else { + die "BUG: $input unhandled"; # should've failed earlier + } + } + delete $lei->{1}; + delete $self->{wcb}; # commit +} + +sub convert_start { + my ($lei) = @_; + my $ops = { + '!' => [ $lei->can('fail_handler'), $lei ], + '|' => [ $lei->can('sigpipe_handler'), $lei ], + 'x_it' => [ $lei->can('x_it'), $lei ], + 'child_error' => [ $lei->can('child_error'), $lei ], + '' => [ $lei->can('dclose'), $lei ], + }; + ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); + my $self = $lei->{cnv}; + $self->wq_workers_start('lei_convert', 1, $lei->oldset, {lei => $lei}); + my $op = delete $lei->{pkt_op_c}; + delete $lei->{pkt_op_p}; + $self->wq_io_do('do_convert', []); + $self->wq_close(1); + $lei->event_step_init; # wait for shutdowns + if ($lei->{oneshot}) { + while ($op->{sock}) { $op->event_step } + } +} + +sub call { # the main "lei convert" method + my ($cls, $lei, @inputs) = @_; + my $opt = $lei->{opt}; + $opt->{kw} //= 1; + my $self = $lei->{cnv} = bless {}, $cls; + my $in_fmt = $opt->{'in-format'}; + my ($nrd, @f, @d); + $opt->{dedupe} //= 'none'; + my $ovv = PublicInbox::LeiOverview->new($lei, 'out-format'); + $lei->{l2m} or return + $lei->fail("output not specified or is not a mail destination"); + $opt->{augment} = 1 unless $ovv->{dst} eq '/dev/stdout'; + if ($opt->{stdin}) { + @inputs and return $lei->fail("--stdin and @inputs do not mix"); + $lei->check_input_format(undef, 'in-format') or return; + $self->{0} = $lei->{0}; + } + # e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX + for my $input (@inputs) { + my $input_path = $input; + if ($input =~ m!\A(?:imap|nntp)s?://!i) { + require PublicInbox::NetReader; + $nrd //= PublicInbox::NetReader->new; + $nrd->add_url($input); + } elsif ($input_path =~ s/\A([a-z0-9]+)://is) { + my $ifmt = lc $1; + if (($in_fmt // $ifmt) ne $ifmt) { + return $lei->fail(<<""); +--in-format=$in_fmt and `$ifmt:' conflict + + } + } elsif (-f $input) { push @f, $input } + elsif (-d _) { push @d, $input } + else { return $lei->fail("Unable to handle $input") } + } + if (@f) { $lei->check_input_format(\@f, 'in-format') or return } + if (@d) { # TODO: check for MH vs Maildir, here + require PublicInbox::MdirReader; + } + $self->{inputs} = \@inputs; + return convert_start($lei) if !$nrd; + + if (my $err = $nrd->errors) { + return $lei->fail($err); + } + $nrd->{quiet} = $opt->{quiet}; + $lei->{nrd} = $nrd; + require PublicInbox::LeiAuth; + my $auth = $lei->{auth} = PublicInbox::LeiAuth->new($nrd); + $auth->auth_start($lei, \&convert_start, $lei); +} + +sub ipc_atfork_child { + my ($self) = @_; + my $lei = $self->{lei}; + $lei->lei_atfork_child; + my $l2m = delete $lei->{l2m}; + $l2m->pre_augment($lei); + $l2m->do_augment($lei); + $l2m->post_augment($lei); + $self->{wcb} = $l2m->write_cb($lei); + $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); + $self->SUPER::ipc_atfork_child; +} + +1; diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm index 2114c0e8..5fec9384 100644 --- a/lib/PublicInbox/LeiDedupe.pm +++ b/lib/PublicInbox/LeiDedupe.pm @@ -127,7 +127,7 @@ sub prepare_dedupe { sub pause_dedupe { my ($self) = @_; - my $skv = $self->[0]; + my $skv = $self->[0] or return; $skv->dbh_release; delete($skv->{dbh}) if $skv; } diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index c820f0d7..3169bae6 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -51,18 +51,19 @@ sub detect_fmt ($$) { } sub new { - my ($class, $lei) = @_; + my ($class, $lei, $ofmt_key) = @_; my $opt = $lei->{opt}; my $dst = $opt->{output} // '-'; $dst = '/dev/stdout' if $dst eq '-'; + $ofmt_key //= 'format'; - my $fmt = $opt->{'format'}; + my $fmt = $opt->{$ofmt_key}; $fmt = lc($fmt) if defined $fmt; if ($dst =~ s/\A([a-z0-9]+)://is) { # e.g. Maildir:/home/user/Mail/ my $ofmt = lc $1; $fmt //= $ofmt; return $lei->fail(<<"") if $fmt ne $ofmt; ---format=$fmt and --output=$ofmt conflict +--$ofmt_key=$fmt and --output=$ofmt conflict } $fmt //= 'json' if $dst eq '/dev/stdout'; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index e3e512be..f0adc44f 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -437,7 +437,7 @@ sub _do_augment_mbox { $dedupe->pause_dedupe if $dedupe; } -sub pre_augment { # fast (1 disk seek), runs in main daemon +sub pre_augment { # fast (1 disk seek), runs in same process as post_augment my ($self, $lei) = @_; # _pre_augment_maildir, _pre_augment_mbox my $m = "_pre_augment_$self->{base_type}"; @@ -451,7 +451,8 @@ sub do_augment { # slow, runs in wq worker $self->$m($lei); } -sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon +# fast (spawn compressor or mkdir), runs in same process as pre_augment +sub post_augment { my ($self, $lei, @args) = @_; # _post_augment_maildir, _post_augment_mbox my $m = "_post_augment_$self->{base_type}"; diff --git a/lib/PublicInbox/MdirReader.pm b/lib/PublicInbox/MdirReader.pm index e0ff676d..5fa534f5 100644 --- a/lib/PublicInbox/MdirReader.pm +++ b/lib/PublicInbox/MdirReader.pm @@ -7,6 +7,7 @@ package PublicInbox::MdirReader; use strict; use v5.10.1; +use PublicInbox::InboxWritable qw(eml_from_path); # returns Maildir flags from a basename ('' for no flags, undef for invalid) sub maildir_basename_flags { @@ -36,4 +37,29 @@ sub maildir_each_file ($$;@) { } } +my %c2kw = ('D' => 'draft', F => 'flagged', R => 'answered', S => 'seen'); + +sub maildir_each_eml ($$;@) { + my ($dir, $cb, @arg) = @_; + $dir .= '/' unless substr($dir, -1) eq '/'; + my $pfx = "$dir/new/"; + if (opendir(my $dh, $pfx)) { + while (defined(my $bn = readdir($dh))) { + next if substr($bn, 0, 1) eq '.'; + my @f = split(/:/, $bn, -1); + next if scalar(@f) != 1; + my $eml = eml_from_path($pfx.$bn) or next; + $cb->([], $eml, @arg); + } + } + $pfx = "$dir/cur/"; + opendir my $dh, $pfx or return; + while (defined(my $bn = readdir($dh))) { + my $fl = maildir_basename_flags($bn) // next; + my $eml = eml_from_path($pfx.$bn) or next; + my @kw = sort(map { $c2kw{$_} // () } split(//, $fl)); + $cb->(\@kw, $eml, @arg); + } +} + 1; diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm index 1d053425..ad8c18d0 100644 --- a/lib/PublicInbox/NetReader.pm +++ b/lib/PublicInbox/NetReader.pm @@ -5,7 +5,8 @@ package PublicInbox::NetReader; use strict; use v5.10.1; -use parent qw(Exporter); +use parent qw(Exporter PublicInbox::IPC); +use PublicInbox::Eml; # TODO: trim this down, this is huge our @EXPORT = qw(uri_new uri_scheme uri_section @@ -33,7 +34,7 @@ sub uri_section ($) { sub auth_anon_cb { '' }; # for Mail::IMAPClient::Authcallback sub mic_for { # mic = Mail::IMAPClient - my ($self, $url, $mic_args) = @_; + my ($self, $url, $mic_args, $lei) = @_; require PublicInbox::URIimap; my $uri = PublicInbox::URIimap->new($url); require PublicInbox::GitCredential; @@ -74,21 +75,26 @@ sub mic_for { # mic = Mail::IMAPClient } if ($cred) { $cred->check_netrc unless defined $cred->{password}; - $cred->fill; # may prompt user here + $cred->fill($lei); # may prompt user here $mic->User($mic_arg->{User} = $cred->{username}); $mic->Password($mic_arg->{Password} = $cred->{password}); } else { # AUTH=ANONYMOUS $mic->Authmechanism($mic_arg->{Authmechanism} = 'ANONYMOUS'); - $mic->Authcallback($mic_arg->{Authcallback} = \&auth_anon_cb); + $mic_arg->{Authcallback} = 'auth_anon_cb'; + $mic->Authcallback(\&auth_anon_cb); } + my $err; if ($mic->login && $mic->IsAuthenticated) { # success! keep IMAPClient->new arg in case we get disconnected $self->{mic_arg}->{uri_section($uri)} = $mic_arg; } else { - warn "E: <$url> LOGIN: $@\n"; + $err = "E: <$url> LOGIN: $@\n"; $mic = undef; } $cred->run($mic ? 'approve' : 'reject') if $cred; + if ($err) { + $lei ? $lei->fail($err) : warn($err); + } $mic; } @@ -139,8 +145,8 @@ E: <$url> STARTTLS requested and failed $nn; } -sub nn_for ($$$) { # nn = Net::NNTP - my ($self, $url, $nn_args) = @_; +sub nn_for ($$$;$) { # nn = Net::NNTP + my ($self, $url, $nn_args, $lei) = @_; my $uri = uri_new($url); my $sec = uri_section($uri); my $nntp_opt = $self->{nntp_opt}->{$sec} //= {}; @@ -170,7 +176,7 @@ sub nn_for ($$$) { # nn = Net::NNTP my $nn = nn_new($nn_arg, $nntp_opt, $url); if ($cred) { - $cred->fill; # may prompt user here + $cred->fill($lei); # may prompt user here if ($nn->authinfo($u, $p)) { push @{$nntp_opt->{-postconn}}, [ 'authinfo', $u, $p ]; } else { @@ -240,14 +246,15 @@ sub cfg_bool ($$$) { } # flesh out common IMAP-specific data structures -sub imap_common_init ($) { - my ($self) = @_; +sub imap_common_init ($;$) { + my ($self, $lei) = @_; + $self->{quiet} = 1 if $lei && $lei->{opt}->{quiet}; eval { require PublicInbox::IMAPClient } or die "Mail::IMAPClient is required for IMAP:\n$@\n"; eval { require PublicInbox::IMAPTracker } or die "DBD::SQLite is required for IMAP\n:$@\n"; require PublicInbox::URIimap; - my $cfg = $self->{pi_cfg}; + my $cfg = $self->{pi_cfg} // $lei->_lei_cfg; my $mic_args = {}; # scheme://authority => Mail:IMAPClient arg for my $url (@{$self->{imap_order}}) { my $uri = PublicInbox::URIimap->new($url); @@ -275,7 +282,8 @@ sub imap_common_init ($) { my $mics = {}; # schema://authority => IMAPClient obj for my $url (@{$self->{imap_order}}) { my $uri = PublicInbox::URIimap->new($url); - $mics->{uri_section($uri)} //= mic_for($self, $url, $mic_args); + my $sec = uri_section($uri); + $mics->{$sec} //= mic_for($self, $url, $mic_args, $lei); } $mics; } @@ -294,9 +302,140 @@ sub errors { if (my $u = $self->{unsupported_url}) { return "Unsupported URL(s): @$u"; } + if ($self->{imap_order}) { + eval { require PublicInbox::IMAPClient } or + die "Mail::IMAPClient is required for IMAP:\n$@\n"; + } undef; } +my %IMAPflags2kw = ( + '\Seen' => 'seen', + '\Answered' => 'answered', + '\Flagged' => 'flagged', + '\Draft' => 'draft', +); + +sub _imap_do_msg ($$$$$) { + my ($self, $url, $uid, $raw, $flags) = @_; + # our target audience expects LF-only, save storage + $$raw =~ s/\r\n/\n/sg; + my $kw = []; + for my $f (split(/ /, $flags)) { + my $k = $IMAPflags2kw{$f} // next; # TODO: X-Label? + push @$kw, $k; + } + my ($eml_cb, @args) = @{$self->{eml_each}}; + $eml_cb->($url, $uid, $kw, PublicInbox::Eml->new($raw), @args); +} + +sub _imap_fetch_all ($$$) { + my ($self, $mic, $url) = @_; + my $uri = PublicInbox::URIimap->new($url); + my $sec = uri_section($uri); + my $mbx = $uri->mailbox; + $mic->Clear(1); # trim results history + $mic->examine($mbx) or return "E: EXAMINE $mbx ($sec) failed: $!"; + my ($r_uidval, $r_uidnext); + for ($mic->Results) { + /^\* OK \[UIDVALIDITY ([0-9]+)\].*/ and $r_uidval = $1; + /^\* OK \[UIDNEXT ([0-9]+)\].*/ and $r_uidnext = $1; + last if $r_uidval && $r_uidnext; + } + $r_uidval //= $mic->uidvalidity($mbx) // + return "E: $url cannot get UIDVALIDITY"; + $r_uidnext //= $mic->uidnext($mbx) // + return "E: $url cannot get UIDNEXT"; + my $itrk = $self->{incremental} ? + PublicInbox::IMAPTracker->new($url) : 0; + my ($l_uidval, $l_uid) = $itrk ? $itrk->get_last : (); + $l_uidval //= $r_uidval; # first time + $l_uid //= 1; + if ($l_uidval != $r_uidval) { + return "E: $url UIDVALIDITY mismatch\n". + "E: local=$l_uidval != remote=$r_uidval"; + } + my $r_uid = $r_uidnext - 1; + if ($l_uid != 1 && $l_uid > $r_uid) { + return "E: $url local UID exceeds remote ($l_uid > $r_uid)\n". + "E: $url strangely, UIDVALIDLITY matches ($l_uidval)\n"; + } + return if $l_uid >= $r_uid; # nothing to do + + warn "# $url fetching UID $l_uid:$r_uid\n" unless $self->{quiet}; + $mic->Uid(1); # the default, we hope + my $bs = $self->{imap_opt}->{$sec}->{batch_size} // 1; + my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK'; + my $key = $req; + $key =~ s/\.PEEK//; + my ($uids, $batch); + my $err; + do { + # I wish "UID FETCH $START:*" could work, but: + # 1) servers do not need to return results in any order + # 2) Mail::IMAPClient doesn't offer a streaming API + $uids = $mic->search("UID $l_uid:*") or + return "E: $url UID SEARCH $l_uid:* error: $!"; + return if scalar(@$uids) == 0; + + # RFC 3501 doesn't seem to indicate order of UID SEARCH + # responses, so sort it ourselves. Order matters so + # IMAPTracker can store the newest UID. + @$uids = sort { $a <=> $b } @$uids; + + # Did we actually get new messages? + return if $uids->[0] < $l_uid; + + $l_uid = $uids->[-1] + 1; # for next search + my $last_uid; + my $n = $self->{max_batch}; + while (scalar @$uids) { + my @batch = splice(@$uids, 0, $bs); + $batch = join(',', @batch); + local $0 = "UID:$batch $mbx $sec"; + my $r = $mic->fetch_hash($batch, $req, 'FLAGS'); + unless ($r) { # network error? + $err = "E: $url UID FETCH $batch error: $!"; + last; + } + for my $uid (@batch) { + # messages get deleted, so holes appear + my $per_uid = delete $r->{$uid} // next; + my $raw = delete($per_uid->{$key}) // next; + _imap_do_msg($self, $url, $uid, \$raw, + $per_uid->{FLAGS}); + $last_uid = $uid; + last if $self->{quit}; + } + last if $self->{quit}; + } + $itrk->update_last($r_uidval, $last_uid) if $itrk; + } until ($err || $self->{quit}); + $err; +} + +sub imap_each { + my ($self, $url, $eml_cb, @args) = @_; + my $uri = PublicInbox::URIimap->new($url); + my $sec = uri_section($uri); + my $mic_arg = $self->{mic_arg}->{$sec} or + die "BUG: no Mail::IMAPClient->new arg for $sec"; + local $0 = $uri->mailbox." $sec"; + my $cb_name = $mic_arg->{Authcallback}; + if (ref($cb_name) ne 'CODE') { + $mic_arg->{Authcallback} = $self->can($cb_name); + } + my $mic = PublicInbox::IMAPClient->new(%$mic_arg, Debug => 0); + my $err; + if ($mic && $mic->IsConnected) { + local $self->{eml_each} = [ $eml_cb, @args ]; + $err = _imap_fetch_all($self, $mic, $url); + } else { + $err = "E: not connected: $!"; + } + $mic; +} + sub new { bless {}, shift }; 1; diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm index c5070cfd..3eb08e9f 100644 --- a/lib/PublicInbox/TestCommon.pm +++ b/lib/PublicInbox/TestCommon.pm @@ -462,10 +462,15 @@ our $lei = sub { sub lei (@) { $lei->(@_) } sub lei_ok (@) { - my $msg = ref($_[-1]) ? pop(@_) : undef; + my $msg = ref($_[-1]) eq 'SCALAR' ? pop(@_) : undef; + my $tmpdir = quotemeta(File::Spec->tmpdir); # filter out anything that looks like a path name for consistent logs - my @msg = grep(!m!\A/!, @_); - ok($lei->(@_), "lei @msg". ($msg ? " ($$msg)" : '')); + my @msg = ref($_[0]) eq 'ARRAY' ? @{$_[0]} : @_; + for (@msg) { + s!\A([a-z0-9]+://)[^/]+/!$1\$HOST_PORT/! || + s!$tmpdir\b/(?:[^/]+/)?!\$TMPDIR/!; + } + ok(lei(@_), "lei @msg". ($msg ? " ($$msg)" : '')) or diag $lei_err; } sub json_utf8 () { diff --git a/t/lei-convert.t b/t/lei-convert.t new file mode 100644 index 00000000..a319c4ad --- /dev/null +++ b/t/lei-convert.t @@ -0,0 +1,36 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; use v5.10.1; use PublicInbox::TestCommon; +use Digest::SHA +require_git 2.6; +require_mods(qw(DBD::SQLite Search::Xapian)); +my ($tmpdir, $for_destroy) = tmpdir; +my $sock = tcp_server; +my $cmd = [ '-imapd', '-W0', "--stdout=$tmpdir/1", "--stderr=$tmpdir/2" ]; +my ($ro_home, $cfg_path) = setup_public_inboxes; +my $env = { PI_CONFIG => $cfg_path }; +my $td = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT("-imapd: $?"); +my $host_port = tcp_host_port($sock); +undef $sock; +test_lei({ tmpdir => $tmpdir }, sub { + my $dig = Digest::SHA->new(256); + lei_ok('convert', '-o', "mboxrd:$tmpdir/foo.mboxrd", + "imap://$host_port/t.v2.0"); + ok(-f "$tmpdir/foo.mboxrd", 'mboxrd created'); + $dig->addfile("$tmpdir/foo.mboxrd"); + my $foo = $dig->digest; + lei_ok('convert', '-o', "$tmpdir/md", "mboxrd:$tmpdir/foo.mboxrd"); + ok(-d "$tmpdir/md", 'Maildir created'); + lei_ok('convert', '-o', "mboxrd:$tmpdir/bar.mboxrd", "$tmpdir/md"); + $dig->addfile("$tmpdir/bar.mboxrd"); + my $bar = $dig->digest; + is($foo, $bar, 'mboxrd round-tripped through Maildir'); + open my $in, '<', "$tmpdir/bar.mboxrd" or BAIL_OUT; + my $rdr = { 0 => $in, 1 => \(my $out), 2 => \$lei_err }; + lei_ok([qw(convert --stdin -F mboxrd -o mboxrd:/dev/stdout)], + undef, $rdr); + $dig->add($out); + is($foo, $dig->digest, 'mboxrd round-tripped --stdin => stdout'); +}); +done_testing; diff --git a/t/net_reader-imap.t b/t/net_reader-imap.t new file mode 100644 index 00000000..eea8b0fd --- /dev/null +++ b/t/net_reader-imap.t @@ -0,0 +1,40 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; use v5.10.1; use PublicInbox::TestCommon; +require_git 2.6; +require_mods(qw(DBD::SQLite Search::Xapian)); +my ($tmpdir, $for_destroy) = tmpdir; +my ($ro_home, $cfg_path) = setup_public_inboxes; +my $cmd = [ '-imapd', '-W0', "--stdout=$tmpdir/1", "--stderr=$tmpdir/2" ]; +my $sock = tcp_server; +my $env = { PI_CONFIG => $cfg_path }; +my $td = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT "-imapd: $?"; +my ($host, $port) = tcp_host_port $sock; +require_ok 'PublicInbox::NetReader'; +my $nrd = PublicInbox::NetReader->new; +$nrd->add_url(my $url = "imap://$host:$port/t.v2.0"); +is($nrd->errors, undef, 'no errors'); +$nrd->{pi_cfg} = PublicInbox::Config->new($cfg_path); +$nrd->imap_common_init; +$nrd->{quiet} = 1; +my (%eml, %urls, %args, $nr, @w); +local $SIG{__WARN__} = sub { push(@w, @_) }; +$nrd->imap_each($url, sub { + my ($u, $uid, $kw, $eml, $arg) = @_; + ++$urls{$u}; + ++$args{$arg}; + like($uid, qr/\A[0-9]+\z/, 'got digit UID '.$uid); + ++$eml{ref($eml)}; + ++$nr; +}, 'blah'); +is(scalar(@w), 0, 'no warnings'); +ok($nr, 'got some emails'); +is($eml{'PublicInbox::Eml'}, $nr, 'got expected Eml objects'); +is(scalar keys %eml, 1, 'only got Eml objects'); +is($urls{$url}, $nr, 'one URL expected number of times'); +is(scalar keys %urls, 1, 'only got one URL'); +is($args{blah}, $nr, 'got arg expected number of times'); +is(scalar keys %args, 1, 'only got one arg'); + +done_testing;
This makes "lei import" more similar to "lei convert" and allows importing from disparate sources simultaneously. We'll also fix some ->child_error usage errors and make the style of the code more similar to the "lei convert" code. --- MANIFEST | 1 + lib/PublicInbox/LeiImport.pm | 126 ++++++++++++++++++++++++----------- t/lei-import-imap.t | 28 ++++++++ t/lei-import-maildir.t | 4 +- t/lei_to_mail.t | 10 +++ 5 files changed, 127 insertions(+), 42 deletions(-) create mode 100644 t/lei-import-imap.t diff --git a/MANIFEST b/MANIFEST index 4f146771..19f73356 100644 --- a/MANIFEST +++ b/MANIFEST @@ -365,6 +365,7 @@ t/kqnotify.t t/lei-convert.t t/lei-daemon.t t/lei-externals.t +t/lei-import-imap.t t/lei-import-maildir.t t/lei-import.t t/lei-mirror.t diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 32f3a467..4d225262 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -29,7 +29,7 @@ sub import_done { # EOF callback for main daemon $imp->wq_wait_old(\&import_done_wait, $lei); } -sub do_import { +sub import_start { my ($lei) = @_; my $ops = { '!' => [ $lei->can('fail_handler'), $lei ], @@ -39,7 +39,7 @@ sub do_import { }; ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); my $self = $lei->{imp}; - my $j = $lei->{opt}->{jobs} // scalar(@{$self->{argv}}) || 1; + my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1; if (my $nrd = $lei->{nrd}) { # $j = $nrd->net_concurrency($j); TODO } else { @@ -50,8 +50,8 @@ sub do_import { my $op = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; $self->wq_io_do('import_stdin', []) if $self->{0}; - for my $x (@{$self->{argv}}) { - $self->wq_io_do('import_path_url', [], $x); + for my $input (@{$self->{inputs}}) { + $self->wq_io_do('import_path_url', [], $input); } $self->wq_close(1); $lei->event_step_init; # wait for shutdowns @@ -61,60 +61,88 @@ sub do_import { } sub call { # the main "lei import" method - my ($cls, $lei, @argv) = @_; + my ($cls, $lei, @inputs) = @_; my $sto = $lei->_lei_store(1); $sto->write_prepare($lei); + my ($nrd, @f, @d); $lei->{opt}->{kw} //= 1; - my $self = $lei->{imp} = bless { argv => \@argv }, $cls; + my $self = $lei->{imp} = bless { inputs => \@inputs }, $cls; if ($lei->{opt}->{stdin}) { - @argv and return - $lei->fail("--stdin and locations (@argv) do not mix"); + @inputs and return $lei->fail("--stdin and @inputs do not mix"); $lei->check_input_format or return; $self->{0} = $lei->{0}; - } else { - my @f; - for my $x (@argv) { - if (-f $x) { push @f, $x } - elsif (-d _) { require PublicInbox::MdirReader } - else { - require PublicInbox::NetReader; - $lei->{nrd} //= PublicInbox::NetReader->new; - $lei->{nrd}->add_url($x); + } + + # TODO: do we need --format for non-stdin? + my $fmt = $lei->{opt}->{'format'}; + # e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX + for my $input (@inputs) { + my $input_path = $input; + if ($input =~ m!\A(?:imap|nntp)s?://!i) { + require PublicInbox::NetReader; + $nrd //= PublicInbox::NetReader->new; + $nrd->add_url($input); + } elsif ($input_path =~ s/\A([a-z0-9]+)://is) { + my $ifmt = lc $1; + if (($fmt // $ifmt) ne $ifmt) { + return $lei->fail(<<""); +--format=$fmt and `$ifmt:' conflict + } - } - if (@f) { $lei->check_input_format(\@f) or return } - if ($lei->{nrd} && (my @err = $lei->{nrd}->errors)) { - return $lei->fail(@err); - } + if (-f $input_path) { + require PublicInbox::MboxReader; + PublicInbox::MboxReader->can($ifmt) or return + $lei->fail("$ifmt not supported"); + } elsif (-d _) { + $ifmt eq 'maildir' or return + $lei->fail("$ifmt not supported"); + } else { return $lei->fail("Unable to handle $input_path") } + } elsif (-f $input) { push @f, $input + } elsif (-d _) { push @d, $input + } else { return $lei->fail("Unable to handle $input") } + } + if (@f) { $lei->check_input_format(\@f) or return } + if (@d) { # TODO: check for MH vs Maildir, here + require PublicInbox::MdirReader; } - do_import($lei); + $self->{inputs} = \@inputs; + return import_start($lei) if !$nrd; + + if (my $err = $nrd->errors) { + return $lei->fail($err); + } + $nrd->{quiet} = $lei->{opt}->{quiet}; + $lei->{nrd} = $nrd; + require PublicInbox::LeiAuth; + my $auth = $lei->{auth} = PublicInbox::LeiAuth->new($nrd); + $auth->auth_start($lei, \&import_start, $lei); } sub ipc_atfork_child { my ($self) = @_; + delete $self->{lei}->{imp}; # drop circular ref $self->{lei}->lei_atfork_child; $self->SUPER::ipc_atfork_child; } sub _import_fh { - my ($lei, $fh, $x) = @_; + my ($lei, $fh, $input, $ifmt) = @_; my $set_kw = $lei->{opt}->{kw}; - my $fmt = $lei->{opt}->{'format'}; eval { - if ($fmt eq 'eml') { + if ($ifmt eq 'eml') { my $buf = do { local $/; <$fh> } // - return $lei->child_error(1 >> 8, <<""); -error reading $x: $! + return $lei->child_error(1 << 8, <<""); +error reading $input: $! my $eml = PublicInbox::Eml->new(\$buf); _import_eml($eml, $lei->{sto}, $set_kw); } else { # some mbox (->can already checked in call); - my $cb = PublicInbox::MboxReader->can($fmt) // - die "BUG: bad fmt=$fmt"; + my $cb = PublicInbox::MboxReader->can($ifmt) // + die "BUG: bad fmt=$ifmt"; $cb->(undef, $fh, \&_import_eml, $lei->{sto}, $set_kw); } }; - $lei->child_error(1 >> 8, "<stdin>: $@") if $@; + $lei->child_error(1 << 8, "<stdin>: $@") if $@; } sub _import_maildir { # maildir_each_file cb @@ -122,27 +150,45 @@ sub _import_maildir { # maildir_each_file cb $sto->ipc_do('set_eml_from_maildir', $f, $set_kw); } +sub _import_imap { # imap_each cb + my ($url, $uid, $kw, $eml, $sto, $set_kw) = @_; + warn "$url $uid"; + $sto->ipc_do('set_eml', $eml, $set_kw ? @$kw : ()); +} + sub import_path_url { - my ($self, $x) = @_; + my ($self, $input) = @_; my $lei = $self->{lei}; + my $ifmt = lc($lei->{opt}->{'format'} // ''); # TODO auto-detect? - if (-f $x) { - open my $fh, '<', $x or return $lei->child_error(1 >> 8, <<""); -unable to open $x: $! + if ($input =~ m!\A(imap|nntp)s?://!i) { + $lei->{nrd}->imap_each($input, \&_import_imap, $lei->{sto}, + $lei->{opt}->{kw}); + return; + } elsif ($input =~ s!\A([a-z0-9]+):!!i) { + $ifmt = lc $1; + } + if (-f $input) { + open my $fh, '<', $input or return $lei->child_error(1 << 8, <<""); +unable to open $input: $! - _import_fh($lei, $fh, $x); - } elsif (-d _ && (-d "$x/cur" || -d "$x/new")) { - PublicInbox::MdirReader::maildir_each_file($x, + _import_fh($lei, $fh, $input, $ifmt); + } elsif (-d _ && (-d "$input/cur" || -d "$input/new")) { + return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir'; +$input appears to a be a maildir, not $ifmt +EOM + PublicInbox::MdirReader::maildir_each_file($input, \&_import_maildir, $lei->{sto}, $lei->{opt}->{kw}); } else { - $lei->fail("$x unsupported (TODO)"); + $lei->fail("$input unsupported (TODO)"); } } sub import_stdin { my ($self) = @_; - _import_fh($self->{lei}, $self->{0}, '<stdin>'); + my $lei = $self->{lei}; + _import_fh($lei, delete $self->{0}, '<stdin>', $lei->{opt}->{'format'}); } 1; diff --git a/t/lei-import-imap.t b/t/lei-import-imap.t new file mode 100644 index 00000000..ee308723 --- /dev/null +++ b/t/lei-import-imap.t @@ -0,0 +1,28 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; use v5.10.1; use PublicInbox::TestCommon; +require_git 2.6; +require_mods(qw(DBD::SQLite Search::Xapian)); +my ($ro_home, $cfg_path) = setup_public_inboxes; +my ($tmpdir, $for_destroy) = tmpdir; +my $sock = tcp_server; +my $cmd = [ '-imapd', '-W0', "--stdout=$tmpdir/1", "--stderr=$tmpdir/2" ]; +my $env = { PI_CONFIG => $cfg_path }; +my $td = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT("-imapd: $?"); +my $host_port = tcp_host_port($sock); +undef $sock; +test_lei({ tmpdir => $tmpdir }, sub { + lei_ok(qw(q bytes:1..)); + my $out = json_utf8->decode($lei_out); + is_deeply($out, [ undef ], 'nothing imported, yet'); + lei_ok('import', "imap://$host_port/t.v2.0"); + lei_ok(qw(q bytes:1..)); + $out = json_utf8->decode($lei_out); + ok(scalar(@$out) > 1, 'got imported messages'); + is(pop @$out, undef, 'trailing JSON null element was null'); + my %r; + for (@$out) { $r{ref($_)}++ } + is_deeply(\%r, { 'HASH' => scalar(@$out) }, 'all hashes'); +}); +done_testing; diff --git a/t/lei-import-maildir.t b/t/lei-import-maildir.t index 5842e19e..d2b059ad 100644 --- a/t/lei-import-maildir.t +++ b/t/lei-import-maildir.t @@ -23,8 +23,8 @@ test_lei(sub { is_deeply($r2, $res, 'idempotent import'); rename("$md/cur/x:2,S", "$md/cur/x:2,SR") or BAIL_OUT "rename: $!"; - ok($lei->(qw(import), $md), 'import Maildir after +answered'); - ok($lei->(qw(q -d none s:boolean)), 'lei q after +answered'); + lei_ok('import', "maildir:$md", \'import Maildir after +answered'); + lei_ok(qw(q -d none s:boolean), \'lei q after +answered'); $res = json_utf8->decode($lei_out); like($res->[0]->{'s'}, qr/use boolean/, 'got expected result'); is_deeply($res->[0]->{kw}, ['answered', 'seen'], 'keywords set'); diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t index 6a571660..72b90700 100644 --- a/t/lei_to_mail.t +++ b/t/lei_to_mail.t @@ -139,6 +139,16 @@ test_lei(sub { is($res->[1], undef, 'only one result'); }); +test_lei(sub { + lei_ok('import', "$mbox:$fn", \'imported mbox:/path') or diag $lei_err; + lei_ok(qw(q s:x), \'lei q works') or diag $lei_err; + my $res = json_utf8->decode($lei_out); + my $x = $res->[0]; + is($x->{'s'}, 'x', 'subject imported') or diag $lei_out; + is_deeply($x->{'kw'}, ['seen'], 'kw imported') or diag $lei_out; + is($res->[1], undef, 'only one result'); +}); + for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? my $zsfx2cmd = PublicInbox::LeiToMail->can('zsfx2cmd'); SKIP: {
The backends for "lei add-external --mirror", "lei convert", and "lei import" all share a similar pattern for spawning background workers. Hoist out the common parts to slim down our code base a bit. The LeiXSearch and LeiToMail workers for "lei q" remains a the odd duck due to the deep pipelining and parallelization. --- lib/PublicInbox/LEI.pm | 19 +++++++++++++++++++ lib/PublicInbox/LeiAuth.pm | 17 +++-------------- lib/PublicInbox/LeiConvert.pm | 22 +++++----------------- lib/PublicInbox/LeiImport.pm | 19 ++++--------------- lib/PublicInbox/LeiMirror.pm | 19 ++++--------------- 5 files changed, 35 insertions(+), 61 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 1e4c36d0..0b4bc20e 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -468,6 +468,25 @@ sub lei_atfork_child { $current_lei = $persist ? undef : $self; # for SIG{__WARN__} } +sub workers_start { + my ($lei, $wq, $ident, $jobs, $ops) = @_; + $ops = { + '!' => [ $lei->can('fail_handler'), $lei ], + '|' => [ $lei->can('sigpipe_handler'), $lei ], + 'x_it' => [ $lei->can('x_it'), $lei ], + 'child_error' => [ $lei->can('child_error'), $lei ], + %$ops + }; + require PublicInbox::PktOp; + ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); + $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei }); + delete $lei->{pkt_op_p}; + my $op = delete $lei->{pkt_op_c}; + $lei->event_step_init; + # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op + $lei->{oneshot} ? $op : undef; +} + sub _help { require PublicInbox::LeiHelp; PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC); diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm index 88310874..7210af99 100644 --- a/lib/PublicInbox/LeiAuth.pm +++ b/lib/PublicInbox/LeiAuth.pm @@ -42,24 +42,13 @@ sub auth_eof { sub auth_start { my ($self, $lei, $post_auth_cb, @args) = @_; - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - '|' => [ $lei->can('sigpipe_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], + my $op = $lei->workers_start($self, 'auth', 1, { 'nrd_merge' => [ \&nrd_merge, $lei ], '' => [ \&auth_eof, $lei, $post_auth_cb, @args ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); - $self->wq_workers_start('lei_auth', 1, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + }); $self->wq_io_do('do_auth', []); $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm index 44d5131b..6dd137bc 100644 --- a/lib/PublicInbox/LeiConvert.pm +++ b/lib/PublicInbox/LeiConvert.pm @@ -8,7 +8,6 @@ use v5.10.1; use parent qw(PublicInbox::IPC); use PublicInbox::Eml; use PublicInbox::InboxWritable qw(eml_from_path); -use PublicInbox::PktOp; use PublicInbox::LeiStore; use PublicInbox::LeiOverview; @@ -59,26 +58,15 @@ sub do_convert { # via wq_do delete $self->{wcb}; # commit } -sub convert_start { +sub convert_start { # LeiAuth->auth_start callback my ($lei) = @_; - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - '|' => [ $lei->can('sigpipe_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], - '' => [ $lei->can('dclose'), $lei ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); my $self = $lei->{cnv}; - $self->wq_workers_start('lei_convert', 1, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + my $op = $lei->workers_start($self, 'lei_convert', 1, { + '' => [ $lei->can('dclose'), $lei ] + }); $self->wq_io_do('do_convert', []); $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub call { # the main "lei convert" method diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 4d225262..a0d79282 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -8,7 +8,6 @@ use v5.10.1; use parent qw(PublicInbox::IPC); use PublicInbox::Eml; use PublicInbox::InboxWritable qw(eml_from_path); -use PublicInbox::PktOp; sub _import_eml { # MboxReader callback my ($eml, $sto, $set_kw) = @_; @@ -31,13 +30,6 @@ sub import_done { # EOF callback for main daemon sub import_start { my ($lei) = @_; - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], - '' => [ \&import_done, $lei ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); my $self = $lei->{imp}; my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1; if (my $nrd = $lei->{nrd}) { @@ -46,18 +38,15 @@ sub import_start { my $nproc = $self->detect_nproc; $j = $nproc if $j > $nproc; } - $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + my $op = $lei->workers_start($self, 'lei_import', $j, { + '' => [ \&import_done, $lei ], + }); $self->wq_io_do('import_stdin', []) if $self->{0}; for my $input (@{$self->{inputs}}) { $self->wq_io_do('import_path_url', [], $input); } $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub call { # the main "lei import" method diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index c5153148..f8ca1ee5 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -8,7 +8,6 @@ use v5.10.1; use parent qw(PublicInbox::IPC); use IO::Uncompress::Gunzip qw(gunzip $GunzipError); use PublicInbox::Spawn qw(popen_rd spawn); -use PublicInbox::PktOp; sub do_finish_mirror { # dwaitpid callback my ($arg, $pid) = @_; @@ -279,22 +278,12 @@ sub start { require PublicInbox::Inbox; require PublicInbox::Admin; require PublicInbox::InboxWritable; - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], - '' => [ \&mirror_done, $lei ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); - $self->wq_workers_start('lei_mirror', 1, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + my $op = $lei->workers_start($self, 'lei_mirror', 1, { + '' => [ \&mirror_done, $lei ] + }); $self->wq_io_do('do_mirror', []); $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub ipc_atfork_child {
We need to ensure authentication failures and error codes get propagated to the parent process(es) properly. For now, this will just be a maintainer test which hits a read/write IMAP server on public-inbox.org on a non-standard port with invalid credentials. --- lib/PublicInbox/LeiAuth.pm | 1 + lib/PublicInbox/NetReader.pm | 3 +++ xt/lei-auth-fail.t | 20 ++++++++++++++++++++ 3 files changed, 24 insertions(+) create mode 100644 xt/lei-auth-fail.t diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm index 7210af99..7acb9900 100644 --- a/lib/PublicInbox/LeiAuth.pm +++ b/lib/PublicInbox/LeiAuth.pm @@ -42,6 +42,7 @@ sub auth_eof { sub auth_start { my ($self, $lei, $post_auth_cb, @args) = @_; + $lei->_lei_cfg(1); # workers may need to read config my $op = $lei->workers_start($self, 'auth', 1, { 'nrd_merge' => [ \&nrd_merge, $lei ], '' => [ \&auth_eof, $lei, $post_auth_cb, @args ], diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm index ad8c18d0..61ea538b 100644 --- a/lib/PublicInbox/NetReader.pm +++ b/lib/PublicInbox/NetReader.pm @@ -89,6 +89,9 @@ sub mic_for { # mic = Mail::IMAPClient $self->{mic_arg}->{uri_section($uri)} = $mic_arg; } else { $err = "E: <$url> LOGIN: $@\n"; + if ($cred && defined($cred->{password})) { + $err =~ s/\Q$cred->{password}\E/*******/g; + } $mic = undef; } $cred->run($mic ? 'approve' : 'reject') if $cred; diff --git a/xt/lei-auth-fail.t b/xt/lei-auth-fail.t new file mode 100644 index 00000000..5308d0f9 --- /dev/null +++ b/xt/lei-auth-fail.t @@ -0,0 +1,20 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; use v5.10.1; use PublicInbox::TestCommon; + +# TODO: mock IMAP server which fails at authentication so we don't +# have to make external connections to test this: +my $imap_fail = $ENV{TEST_LEI_IMAP_FAIL_URL} // + 'imaps://AzureDiamond:Hunter2@public-inbox.org:994/INBOX'; +test_lei(sub { + ok(!lei(qw(convert -o mboxrd:/dev/stdout), $imap_fail), + 'IMAP auth failure on convert'); + like($lei_err, qr!\bE:.*?imaps://.*?!sm, 'error shown'); + unlike($lei_err, qr!Hunter2!s, 'password not shown'); + is($lei_out, '', 'nothing output'); + ok(!lei(qw(import), $imap_fail), 'IMAP auth failure on import'); + like($lei_err, qr!\bE:.*?imaps://.*?!sm, 'error shown'); + unlike($lei_err, qr!Hunter2!s, 'password not shown'); +}); +done_testing;
Eric Wong <e@80x24.org> wrote: > +++ b/t/lei-convert.t > +test_lei({ tmpdir => $tmpdir }, sub { > + my $dig = Digest::SHA->new(256); > + lei_ok('convert', '-o', "mboxrd:$tmpdir/foo.mboxrd", > + "imap://$host_port/t.v2.0"); > + ok(-f "$tmpdir/foo.mboxrd", 'mboxrd created'); > + $dig->addfile("$tmpdir/foo.mboxrd"); > + my $foo = $dig->digest; > + lei_ok('convert', '-o', "$tmpdir/md", "mboxrd:$tmpdir/foo.mboxrd"); > + ok(-d "$tmpdir/md", 'Maildir created'); > + lei_ok('convert', '-o', "mboxrd:$tmpdir/bar.mboxrd", "$tmpdir/md"); > + $dig->addfile("$tmpdir/bar.mboxrd"); > + my $bar = $dig->digest; > + is($foo, $bar, 'mboxrd round-tripped through Maildir'); Oh dear, I've truly lost my mind :< readdir order is totally random and by some dumb luck this worked when I tested it. > + open my $in, '<', "$tmpdir/bar.mboxrd" or BAIL_OUT; > + my $rdr = { 0 => $in, 1 => \(my $out), 2 => \$lei_err }; > + lei_ok([qw(convert --stdin -F mboxrd -o mboxrd:/dev/stdout)], > + undef, $rdr); > + $dig->add($out); > + is($foo, $dig->digest, 'mboxrd round-tripped --stdin => stdout');
The original t/lei-convert.t was bonkers and now fixed in 1/4 Minor changes for everything except 3/4 which AFAIK has no changes. Eric Wong (4): lei convert: mail format conversion sub-command lei import: add IMAP and (maildir|mbox*):$PATHNAME support lei: consolidate the bulk of the IPC code lei: check for IMAP auth errors MANIFEST | 6 ++ lib/PublicInbox/GitCredential.pm | 18 ++-- lib/PublicInbox/LEI.pm | 57 +++++++++-- lib/PublicInbox/LeiAuth.pm | 70 +++++++++++++ lib/PublicInbox/LeiConvert.pm | 148 +++++++++++++++++++++++++++ lib/PublicInbox/LeiDedupe.pm | 2 +- lib/PublicInbox/LeiImport.pm | 148 +++++++++++++++++---------- lib/PublicInbox/LeiMirror.pm | 19 +--- lib/PublicInbox/LeiOverview.pm | 7 +- lib/PublicInbox/LeiToMail.pm | 5 +- lib/PublicInbox/MdirReader.pm | 26 +++++ lib/PublicInbox/NetReader.pm | 166 ++++++++++++++++++++++++++++--- lib/PublicInbox/TestCommon.pm | 11 +- t/lei-convert.t | 71 +++++++++++++ t/lei-import-imap.t | 28 ++++++ t/lei-import-maildir.t | 4 +- t/lei_to_mail.t | 10 ++ t/net_reader-imap.t | 40 ++++++++ xt/lei-auth-fail.t | 20 ++++ 19 files changed, 747 insertions(+), 109 deletions(-) create mode 100644 lib/PublicInbox/LeiAuth.pm create mode 100644 lib/PublicInbox/LeiConvert.pm create mode 100644 t/lei-convert.t create mode 100644 t/lei-import-imap.t create mode 100644 t/net_reader-imap.t create mode 100644 xt/lei-auth-fail.t
This will make testing IMAP support for other commands easier, as it doesn't write to lei/store at all. Like the pager and MUA, "git credential" is always spawned by script/lei (and not lei-daemon) so it has a controlling terminal for password prompts. v2: fix missing requires, correct test ordering --- MANIFEST | 4 + lib/PublicInbox/GitCredential.pm | 18 ++-- lib/PublicInbox/LEI.pm | 38 +++++-- lib/PublicInbox/LeiAuth.pm | 80 +++++++++++++++ lib/PublicInbox/LeiConvert.pm | 160 ++++++++++++++++++++++++++++++ lib/PublicInbox/LeiDedupe.pm | 2 +- lib/PublicInbox/LeiOverview.pm | 7 +- lib/PublicInbox/LeiToMail.pm | 5 +- lib/PublicInbox/MdirReader.pm | 26 +++++ lib/PublicInbox/NetReader.pm | 163 ++++++++++++++++++++++++++++--- lib/PublicInbox/TestCommon.pm | 11 ++- t/lei-convert.t | 71 ++++++++++++++ t/net_reader-imap.t | 40 ++++++++ 13 files changed, 588 insertions(+), 37 deletions(-) create mode 100644 lib/PublicInbox/LeiAuth.pm create mode 100644 lib/PublicInbox/LeiConvert.pm create mode 100644 t/lei-convert.t create mode 100644 t/net_reader-imap.t diff --git a/MANIFEST b/MANIFEST index 82068900..4f146771 100644 --- a/MANIFEST +++ b/MANIFEST @@ -178,6 +178,8 @@ lib/PublicInbox/InputPipe.pm lib/PublicInbox/Isearch.pm lib/PublicInbox/KQNotify.pm lib/PublicInbox/LEI.pm +lib/PublicInbox/LeiAuth.pm +lib/PublicInbox/LeiConvert.pm lib/PublicInbox/LeiCurl.pm lib/PublicInbox/LeiDedupe.pm lib/PublicInbox/LeiExternal.pm @@ -360,6 +362,7 @@ t/init.t t/ipc.t t/iso-2202-jp.eml t/kqnotify.t +t/lei-convert.t t/lei-daemon.t t/lei-externals.t t/lei-import-maildir.t @@ -388,6 +391,7 @@ t/msg_iter.t t/msgmap.t t/msgtime.t t/multi-mid.t +t/net_reader-imap.t t/nntp.t t/nntpd-tls.t t/nntpd-v2.t diff --git a/lib/PublicInbox/GitCredential.pm b/lib/PublicInbox/GitCredential.pm index 9e193029..2d81817c 100644 --- a/lib/PublicInbox/GitCredential.pm +++ b/lib/PublicInbox/GitCredential.pm @@ -4,11 +4,17 @@ package PublicInbox::GitCredential; use strict; use PublicInbox::Spawn qw(popen_rd); -sub run ($$) { - my ($self, $op) = @_; - my ($in_r, $in_w); +sub run ($$;$) { + my ($self, $op, $lei) = @_; + my ($in_r, $in_w, $out_r); + my $cmd = [ qw(git credential), $op ]; pipe($in_r, $in_w) or die "pipe: $!"; - my $out_r = popen_rd([qw(git credential), $op], undef, { 0 => $in_r }); + if ($lei && !$lei->{oneshot}) { # we'll die if disconnected: + pipe($out_r, my $out_w) or die "pipe: $!"; + $lei->send_exec_cmd([ $in_r, $out_w ], $cmd, {}); + } else { + $out_r = popen_rd($cmd, undef, { 0 => $in_r }); + } close $in_r or die "close in_r: $!"; my $out = ''; @@ -41,8 +47,8 @@ sub check_netrc ($) { } sub fill { - my ($self) = @_; - my $out_r = run($self, 'fill'); + my ($self, $lei) = @_; + my $out_r = run($self, 'fill', $lei); while (<$out_r>) { chomp; return if $_ eq ''; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 1fa9f751..1e4c36d0 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -173,7 +173,11 @@ our %CMD = ( # sorted in order of importance/use: qw(stdin| offset=i recursive|r exclude=s include|I=s format|f=s kw|keywords|flags!), ], - +'convert' => [ 'LOCATION...|--stdin', + 'one-time conversion from URL or filesystem to another format', + qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s quiet|q + kw|keywords|flags!), + ], 'config' => [ '[...]', sub { 'git-config(1) wrapper for '._config_path($_[0]); }, qw(config-file|system|global|file|f=s), # for conflict detection @@ -320,7 +324,7 @@ my %CONFIG_KEYS = ( 'leistore.dir' => 'top-level storage location', ); -my @WQ_KEYS = qw(lxs l2m imp mrr); # internal workers +my @WQ_KEYS = qw(lxs l2m imp mrr cnv auth); # internal workers # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE sub x_it ($$) { @@ -391,18 +395,19 @@ sub fail ($$;$) { undef; } -sub check_input_format ($;$) { - my ($self, $files) = @_; - my $fmt = $self->{opt}->{'format'}; +sub check_input_format ($;$$) { + my ($self, $files, $opt_key) = @_; + $opt_key //= 'format'; + my $fmt = $self->{opt}->{$opt_key}; if (!$fmt) { my $err = $files ? "regular file(s):\n@$files" : '--stdin'; - return fail($self, "--format unset for $err"); + return fail($self, "--$opt_key unset for $err"); } return 1 if $fmt eq 'eml'; # XXX: should this handle {gz,bz2,xz}? that's currently in LeiToMail require PublicInbox::MboxReader; PublicInbox::MboxReader->can($fmt) || - fail($self, "--format=$fmt unrecognized"); + fail($self, "--$opt_key=$fmt unrecognized"); } sub out ($;@) { @@ -445,6 +450,7 @@ sub lei_atfork_child { } else { delete $self->{0}; } + delete @$self{qw(cnv)}; for (delete @$self{qw(3 sock old_1 au_done)}) { close($_) if defined($_); } @@ -626,6 +632,11 @@ sub lei_import { PublicInbox::LeiImport->call(@_); } +sub lei_convert { + require PublicInbox::LeiConvert; + PublicInbox::LeiConvert->call(@_); +} + sub lei_init { my ($self, $dir) = @_; my $cfg = _lei_cfg($self, 1); @@ -770,6 +781,13 @@ sub start_mua { delete $self->{opt}->{verbose}; } +sub send_exec_cmd { # tell script/lei to execute a command + my ($self, $io, $cmd, $env) = @_; + my $sock = $self->{sock} // die 'lei client gone'; + my $fds = [ map { fileno($_) } @$io ]; + $send_cmd->($sock, $fds, exec_buf($cmd, $env), MSG_EOR); +} + sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail my ($self) = @_; my $alerts = $self->{opt}->{alert} // return; @@ -813,10 +831,9 @@ sub start_pager { pipe(my ($r, $wpager)) or return warn "pipe: $!"; my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} }; my $pgr = [ undef, @$rdr{1, 2} ]; - if (my $sock = $self->{sock}) { # lei(1) process runs it + if ($self->{sock}) { # lei(1) process runs it delete @$new_env{keys %$env}; # only set iff unset - my $fds = [ map { fileno($_) } @$rdr{0..2} ]; - $send_cmd->($sock, $fds, exec_buf([$pager], $new_env), MSG_EOR); + send_exec_cmd($self, [ @$rdr{0..2} ], [$pager], $new_env); } elsif ($self->{oneshot}) { my $cmd = [$pager]; $self->{"pid.$self.$$"}->{spawn($cmd, $new_env, $rdr)} = $cmd; @@ -920,6 +937,7 @@ sub event_step { sub event_step_init { my ($self) = @_; + return if $self->{-event_init_done}++; if (my $sock = $self->{sock}) { # using DS->EventLoop $self->SUPER::new($sock, EPOLLIN|EPOLLET); } diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm new file mode 100644 index 00000000..88310874 --- /dev/null +++ b/lib/PublicInbox/LeiAuth.pm @@ -0,0 +1,80 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# Authentication worker for anything that needs auth for read/write IMAP +# (eventually for read-only NNTP access) +package PublicInbox::LeiAuth; +use strict; +use v5.10.1; +use parent qw(PublicInbox::IPC); +use PublicInbox::PktOp qw(pkt_do); +use PublicInbox::NetReader; + +sub nrd_merge { + my ($lei, $nrd_new) = @_; + if ($lei->{pkt_op_p}) { # from lei_convert worker + pkt_do($lei->{pkt_op_p}, 'nrd_merge', $nrd_new); + } else { # single lei-daemon consumer + my $self = $lei->{auth} or return; # client disconnected + my $nrd = $self->{nrd}; + %$nrd = (%$nrd, %$nrd_new); + } +} + +sub do_auth { # called via wq_io_do + my ($self) = @_; + my ($lei, $nrd) = @$self{qw(lei nrd)}; + $nrd->imap_common_init($lei); + nrd_merge($lei, $nrd); # tell lei-daemon updated auth info +} + +sub do_finish_auth { # dwaitpid callback + my ($arg, $pid) = @_; + my ($self, $lei, $post_auth_cb, @args) = @$arg; + $? ? $lei->dclose : $post_auth_cb->(@args); +} + +sub auth_eof { + my ($lei, $post_auth_cb, @args) = @_; + my $self = delete $lei->{auth} or return; + $self->wq_wait_old(\&do_finish_auth, $lei, $post_auth_cb, @args); +} + +sub auth_start { + my ($self, $lei, $post_auth_cb, @args) = @_; + my $ops = { + '!' => [ $lei->can('fail_handler'), $lei ], + '|' => [ $lei->can('sigpipe_handler'), $lei ], + 'x_it' => [ $lei->can('x_it'), $lei ], + 'child_error' => [ $lei->can('child_error'), $lei ], + 'nrd_merge' => [ \&nrd_merge, $lei ], + '' => [ \&auth_eof, $lei, $post_auth_cb, @args ], + }; + ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); + $self->wq_workers_start('lei_auth', 1, $lei->oldset, {lei => $lei}); + my $op = delete $lei->{pkt_op_c}; + delete $lei->{pkt_op_p}; + $self->wq_io_do('do_auth', []); + $self->wq_close(1); + $lei->event_step_init; # wait for shutdowns + if ($lei->{oneshot}) { + while ($op->{sock}) { $op->event_step } + } +} + +sub ipc_atfork_child { + my ($self) = @_; + # prevent {sock} from being closed in lei_atfork_child: + my $s = delete $self->{lei}->{sock}; + delete $self->{lei}->{auth}; # drop circular ref + $self->{lei}->lei_atfork_child; + $self->{lei}->{sock} = $s if $s; + $self->SUPER::ipc_atfork_child; +} + +sub new { + my ($cls, $nrd) = @_; + bless { nrd => $nrd }, $cls; +} + +1; diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm new file mode 100644 index 00000000..78fd5e17 --- /dev/null +++ b/lib/PublicInbox/LeiConvert.pm @@ -0,0 +1,160 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# front-end for the "lei convert" sub-command +package PublicInbox::LeiConvert; +use strict; +use v5.10.1; +use parent qw(PublicInbox::IPC); +use PublicInbox::Eml; +use PublicInbox::InboxWritable qw(eml_from_path); +use PublicInbox::PktOp; +use PublicInbox::LeiStore; +use PublicInbox::LeiOverview; + +sub mbox_cb { + my ($eml, $self) = @_; + my @kw = PublicInbox::LeiStore::mbox_keywords($eml); + $eml->header_set($_) for qw(Status X-Status); + $self->{wcb}->(undef, { kw => \@kw }, $eml); +} + +sub imap_cb { # ->imap_each + my ($url, $uid, $kw, $eml, $self) = @_; + $self->{wcb}->(undef, { kw => $kw }, $eml); +} + +sub mdir_cb { + my ($kw, $eml, $self) = @_; + $self->{wcb}->(undef, { kw => $kw }, $eml); +} + +sub do_convert { # via wq_do + my ($self) = @_; + my $lei = $self->{lei}; + my $in_fmt = $lei->{opt}->{'in-format'}; + if (my $stdin = delete $self->{0}) { + PublicInbox::MboxReader->$in_fmt($stdin, \&mbox_cb, $self); + } + for my $input (@{$self->{inputs}}) { + my $ifmt = lc($in_fmt // ''); + if ($input =~ m!\A(?:imap|nntp)s?://!) { # TODO: nntp + $lei->{nrd}->imap_each($input, \&imap_cb, $self); + next; + } elsif ($input =~ s!\A([a-z0-9]+):!!i) { + $ifmt = lc $1; + } + if (-f $input) { + open my $fh, '<', $input or + return $lei->fail("open $input: $!"); + PublicInbox::MboxReader->$ifmt($fh, \&mbox_cb, $self); + } elsif (-d _) { + PublicInbox::MdirReader::maildir_each_eml($input, + \&mdir_cb, $self); + } else { + die "BUG: $input unhandled"; # should've failed earlier + } + } + delete $lei->{1}; + delete $self->{wcb}; # commit +} + +sub convert_start { + my ($lei) = @_; + my $ops = { + '!' => [ $lei->can('fail_handler'), $lei ], + '|' => [ $lei->can('sigpipe_handler'), $lei ], + 'x_it' => [ $lei->can('x_it'), $lei ], + 'child_error' => [ $lei->can('child_error'), $lei ], + '' => [ $lei->can('dclose'), $lei ], + }; + ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); + my $self = $lei->{cnv}; + $self->wq_workers_start('lei_convert', 1, $lei->oldset, {lei => $lei}); + my $op = delete $lei->{pkt_op_c}; + delete $lei->{pkt_op_p}; + $self->wq_io_do('do_convert', []); + $self->wq_close(1); + $lei->event_step_init; # wait for shutdowns + if ($lei->{oneshot}) { + while ($op->{sock}) { $op->event_step } + } +} + +sub call { # the main "lei convert" method + my ($cls, $lei, @inputs) = @_; + my $opt = $lei->{opt}; + $opt->{kw} //= 1; + my $self = $lei->{cnv} = bless {}, $cls; + my $in_fmt = $opt->{'in-format'}; + my ($nrd, @f, @d); + $opt->{dedupe} //= 'none'; + my $ovv = PublicInbox::LeiOverview->new($lei, 'out-format'); + $lei->{l2m} or return + $lei->fail("output not specified or is not a mail destination"); + $opt->{augment} = 1 unless $ovv->{dst} eq '/dev/stdout'; + if ($opt->{stdin}) { + @inputs and return $lei->fail("--stdin and @inputs do not mix"); + $lei->check_input_format(undef, 'in-format') or return; + $self->{0} = $lei->{0}; + } + # e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX + for my $input (@inputs) { + my $input_path = $input; + if ($input =~ m!\A(?:imap|nntp)s?://!i) { + require PublicInbox::NetReader; + $nrd //= PublicInbox::NetReader->new; + $nrd->add_url($input); + } elsif ($input_path =~ s/\A([a-z0-9]+)://is) { + my $ifmt = lc $1; + if (($in_fmt // $ifmt) ne $ifmt) { + return $lei->fail(<<""); +--in-format=$in_fmt and `$ifmt:' conflict + + } + if (-f $input_path) { + require PublicInbox::MboxReader; + PublicInbox::MboxReader->can($ifmt) or return + $lei->fail("$ifmt not supported"); + } elsif (-d _) { + require PublicInbox::MdirReader; + $ifmt eq 'maildir' or return + $lei->fail("$ifmt not supported"); + } else { + return $lei->fail("Unable to handle $input"); + } + } elsif (-f $input) { push @f, $input } + elsif (-d _) { push @d, $input } + else { return $lei->fail("Unable to handle $input") } + } + if (@f) { $lei->check_input_format(\@f, 'in-format') or return } + if (@d) { # TODO: check for MH vs Maildir, here + require PublicInbox::MdirReader; + } + $self->{inputs} = \@inputs; + return convert_start($lei) if !$nrd; + + if (my $err = $nrd->errors) { + return $lei->fail($err); + } + $nrd->{quiet} = $opt->{quiet}; + $lei->{nrd} = $nrd; + require PublicInbox::LeiAuth; + my $auth = $lei->{auth} = PublicInbox::LeiAuth->new($nrd); + $auth->auth_start($lei, \&convert_start, $lei); +} + +sub ipc_atfork_child { + my ($self) = @_; + my $lei = $self->{lei}; + $lei->lei_atfork_child; + my $l2m = delete $lei->{l2m}; + $l2m->pre_augment($lei); + $l2m->do_augment($lei); + $l2m->post_augment($lei); + $self->{wcb} = $l2m->write_cb($lei); + $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); + $self->SUPER::ipc_atfork_child; +} + +1; diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm index 2114c0e8..5fec9384 100644 --- a/lib/PublicInbox/LeiDedupe.pm +++ b/lib/PublicInbox/LeiDedupe.pm @@ -127,7 +127,7 @@ sub prepare_dedupe { sub pause_dedupe { my ($self) = @_; - my $skv = $self->[0]; + my $skv = $self->[0] or return; $skv->dbh_release; delete($skv->{dbh}) if $skv; } diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index c820f0d7..3169bae6 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -51,18 +51,19 @@ sub detect_fmt ($$) { } sub new { - my ($class, $lei) = @_; + my ($class, $lei, $ofmt_key) = @_; my $opt = $lei->{opt}; my $dst = $opt->{output} // '-'; $dst = '/dev/stdout' if $dst eq '-'; + $ofmt_key //= 'format'; - my $fmt = $opt->{'format'}; + my $fmt = $opt->{$ofmt_key}; $fmt = lc($fmt) if defined $fmt; if ($dst =~ s/\A([a-z0-9]+)://is) { # e.g. Maildir:/home/user/Mail/ my $ofmt = lc $1; $fmt //= $ofmt; return $lei->fail(<<"") if $fmt ne $ofmt; ---format=$fmt and --output=$ofmt conflict +--$ofmt_key=$fmt and --output=$ofmt conflict } $fmt //= 'json' if $dst eq '/dev/stdout'; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index e3e512be..f0adc44f 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -437,7 +437,7 @@ sub _do_augment_mbox { $dedupe->pause_dedupe if $dedupe; } -sub pre_augment { # fast (1 disk seek), runs in main daemon +sub pre_augment { # fast (1 disk seek), runs in same process as post_augment my ($self, $lei) = @_; # _pre_augment_maildir, _pre_augment_mbox my $m = "_pre_augment_$self->{base_type}"; @@ -451,7 +451,8 @@ sub do_augment { # slow, runs in wq worker $self->$m($lei); } -sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon +# fast (spawn compressor or mkdir), runs in same process as pre_augment +sub post_augment { my ($self, $lei, @args) = @_; # _post_augment_maildir, _post_augment_mbox my $m = "_post_augment_$self->{base_type}"; diff --git a/lib/PublicInbox/MdirReader.pm b/lib/PublicInbox/MdirReader.pm index e0ff676d..5fa534f5 100644 --- a/lib/PublicInbox/MdirReader.pm +++ b/lib/PublicInbox/MdirReader.pm @@ -7,6 +7,7 @@ package PublicInbox::MdirReader; use strict; use v5.10.1; +use PublicInbox::InboxWritable qw(eml_from_path); # returns Maildir flags from a basename ('' for no flags, undef for invalid) sub maildir_basename_flags { @@ -36,4 +37,29 @@ sub maildir_each_file ($$;@) { } } +my %c2kw = ('D' => 'draft', F => 'flagged', R => 'answered', S => 'seen'); + +sub maildir_each_eml ($$;@) { + my ($dir, $cb, @arg) = @_; + $dir .= '/' unless substr($dir, -1) eq '/'; + my $pfx = "$dir/new/"; + if (opendir(my $dh, $pfx)) { + while (defined(my $bn = readdir($dh))) { + next if substr($bn, 0, 1) eq '.'; + my @f = split(/:/, $bn, -1); + next if scalar(@f) != 1; + my $eml = eml_from_path($pfx.$bn) or next; + $cb->([], $eml, @arg); + } + } + $pfx = "$dir/cur/"; + opendir my $dh, $pfx or return; + while (defined(my $bn = readdir($dh))) { + my $fl = maildir_basename_flags($bn) // next; + my $eml = eml_from_path($pfx.$bn) or next; + my @kw = sort(map { $c2kw{$_} // () } split(//, $fl)); + $cb->(\@kw, $eml, @arg); + } +} + 1; diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm index 1d053425..ad8c18d0 100644 --- a/lib/PublicInbox/NetReader.pm +++ b/lib/PublicInbox/NetReader.pm @@ -5,7 +5,8 @@ package PublicInbox::NetReader; use strict; use v5.10.1; -use parent qw(Exporter); +use parent qw(Exporter PublicInbox::IPC); +use PublicInbox::Eml; # TODO: trim this down, this is huge our @EXPORT = qw(uri_new uri_scheme uri_section @@ -33,7 +34,7 @@ sub uri_section ($) { sub auth_anon_cb { '' }; # for Mail::IMAPClient::Authcallback sub mic_for { # mic = Mail::IMAPClient - my ($self, $url, $mic_args) = @_; + my ($self, $url, $mic_args, $lei) = @_; require PublicInbox::URIimap; my $uri = PublicInbox::URIimap->new($url); require PublicInbox::GitCredential; @@ -74,21 +75,26 @@ sub mic_for { # mic = Mail::IMAPClient } if ($cred) { $cred->check_netrc unless defined $cred->{password}; - $cred->fill; # may prompt user here + $cred->fill($lei); # may prompt user here $mic->User($mic_arg->{User} = $cred->{username}); $mic->Password($mic_arg->{Password} = $cred->{password}); } else { # AUTH=ANONYMOUS $mic->Authmechanism($mic_arg->{Authmechanism} = 'ANONYMOUS'); - $mic->Authcallback($mic_arg->{Authcallback} = \&auth_anon_cb); + $mic_arg->{Authcallback} = 'auth_anon_cb'; + $mic->Authcallback(\&auth_anon_cb); } + my $err; if ($mic->login && $mic->IsAuthenticated) { # success! keep IMAPClient->new arg in case we get disconnected $self->{mic_arg}->{uri_section($uri)} = $mic_arg; } else { - warn "E: <$url> LOGIN: $@\n"; + $err = "E: <$url> LOGIN: $@\n"; $mic = undef; } $cred->run($mic ? 'approve' : 'reject') if $cred; + if ($err) { + $lei ? $lei->fail($err) : warn($err); + } $mic; } @@ -139,8 +145,8 @@ E: <$url> STARTTLS requested and failed $nn; } -sub nn_for ($$$) { # nn = Net::NNTP - my ($self, $url, $nn_args) = @_; +sub nn_for ($$$;$) { # nn = Net::NNTP + my ($self, $url, $nn_args, $lei) = @_; my $uri = uri_new($url); my $sec = uri_section($uri); my $nntp_opt = $self->{nntp_opt}->{$sec} //= {}; @@ -170,7 +176,7 @@ sub nn_for ($$$) { # nn = Net::NNTP my $nn = nn_new($nn_arg, $nntp_opt, $url); if ($cred) { - $cred->fill; # may prompt user here + $cred->fill($lei); # may prompt user here if ($nn->authinfo($u, $p)) { push @{$nntp_opt->{-postconn}}, [ 'authinfo', $u, $p ]; } else { @@ -240,14 +246,15 @@ sub cfg_bool ($$$) { } # flesh out common IMAP-specific data structures -sub imap_common_init ($) { - my ($self) = @_; +sub imap_common_init ($;$) { + my ($self, $lei) = @_; + $self->{quiet} = 1 if $lei && $lei->{opt}->{quiet}; eval { require PublicInbox::IMAPClient } or die "Mail::IMAPClient is required for IMAP:\n$@\n"; eval { require PublicInbox::IMAPTracker } or die "DBD::SQLite is required for IMAP\n:$@\n"; require PublicInbox::URIimap; - my $cfg = $self->{pi_cfg}; + my $cfg = $self->{pi_cfg} // $lei->_lei_cfg; my $mic_args = {}; # scheme://authority => Mail:IMAPClient arg for my $url (@{$self->{imap_order}}) { my $uri = PublicInbox::URIimap->new($url); @@ -275,7 +282,8 @@ sub imap_common_init ($) { my $mics = {}; # schema://authority => IMAPClient obj for my $url (@{$self->{imap_order}}) { my $uri = PublicInbox::URIimap->new($url); - $mics->{uri_section($uri)} //= mic_for($self, $url, $mic_args); + my $sec = uri_section($uri); + $mics->{$sec} //= mic_for($self, $url, $mic_args, $lei); } $mics; } @@ -294,9 +302,140 @@ sub errors { if (my $u = $self->{unsupported_url}) { return "Unsupported URL(s): @$u"; } + if ($self->{imap_order}) { + eval { require PublicInbox::IMAPClient } or + die "Mail::IMAPClient is required for IMAP:\n$@\n"; + } undef; } +my %IMAPflags2kw = ( + '\Seen' => 'seen', + '\Answered' => 'answered', + '\Flagged' => 'flagged', + '\Draft' => 'draft', +); + +sub _imap_do_msg ($$$$$) { + my ($self, $url, $uid, $raw, $flags) = @_; + # our target audience expects LF-only, save storage + $$raw =~ s/\r\n/\n/sg; + my $kw = []; + for my $f (split(/ /, $flags)) { + my $k = $IMAPflags2kw{$f} // next; # TODO: X-Label? + push @$kw, $k; + } + my ($eml_cb, @args) = @{$self->{eml_each}}; + $eml_cb->($url, $uid, $kw, PublicInbox::Eml->new($raw), @args); +} + +sub _imap_fetch_all ($$$) { + my ($self, $mic, $url) = @_; + my $uri = PublicInbox::URIimap->new($url); + my $sec = uri_section($uri); + my $mbx = $uri->mailbox; + $mic->Clear(1); # trim results history + $mic->examine($mbx) or return "E: EXAMINE $mbx ($sec) failed: $!"; + my ($r_uidval, $r_uidnext); + for ($mic->Results) { + /^\* OK \[UIDVALIDITY ([0-9]+)\].*/ and $r_uidval = $1; + /^\* OK \[UIDNEXT ([0-9]+)\].*/ and $r_uidnext = $1; + last if $r_uidval && $r_uidnext; + } + $r_uidval //= $mic->uidvalidity($mbx) // + return "E: $url cannot get UIDVALIDITY"; + $r_uidnext //= $mic->uidnext($mbx) // + return "E: $url cannot get UIDNEXT"; + my $itrk = $self->{incremental} ? + PublicInbox::IMAPTracker->new($url) : 0; + my ($l_uidval, $l_uid) = $itrk ? $itrk->get_last : (); + $l_uidval //= $r_uidval; # first time + $l_uid //= 1; + if ($l_uidval != $r_uidval) { + return "E: $url UIDVALIDITY mismatch\n". + "E: local=$l_uidval != remote=$r_uidval"; + } + my $r_uid = $r_uidnext - 1; + if ($l_uid != 1 && $l_uid > $r_uid) { + return "E: $url local UID exceeds remote ($l_uid > $r_uid)\n". + "E: $url strangely, UIDVALIDLITY matches ($l_uidval)\n"; + } + return if $l_uid >= $r_uid; # nothing to do + + warn "# $url fetching UID $l_uid:$r_uid\n" unless $self->{quiet}; + $mic->Uid(1); # the default, we hope + my $bs = $self->{imap_opt}->{$sec}->{batch_size} // 1; + my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK'; + my $key = $req; + $key =~ s/\.PEEK//; + my ($uids, $batch); + my $err; + do { + # I wish "UID FETCH $START:*" could work, but: + # 1) servers do not need to return results in any order + # 2) Mail::IMAPClient doesn't offer a streaming API + $uids = $mic->search("UID $l_uid:*") or + return "E: $url UID SEARCH $l_uid:* error: $!"; + return if scalar(@$uids) == 0; + + # RFC 3501 doesn't seem to indicate order of UID SEARCH + # responses, so sort it ourselves. Order matters so + # IMAPTracker can store the newest UID. + @$uids = sort { $a <=> $b } @$uids; + + # Did we actually get new messages? + return if $uids->[0] < $l_uid; + + $l_uid = $uids->[-1] + 1; # for next search + my $last_uid; + my $n = $self->{max_batch}; + while (scalar @$uids) { + my @batch = splice(@$uids, 0, $bs); + $batch = join(',', @batch); + local $0 = "UID:$batch $mbx $sec"; + my $r = $mic->fetch_hash($batch, $req, 'FLAGS'); + unless ($r) { # network error? + $err = "E: $url UID FETCH $batch error: $!"; + last; + } + for my $uid (@batch) { + # messages get deleted, so holes appear + my $per_uid = delete $r->{$uid} // next; + my $raw = delete($per_uid->{$key}) // next; + _imap_do_msg($self, $url, $uid, \$raw, + $per_uid->{FLAGS}); + $last_uid = $uid; + last if $self->{quit}; + } + last if $self->{quit}; + } + $itrk->update_last($r_uidval, $last_uid) if $itrk; + } until ($err || $self->{quit}); + $err; +} + +sub imap_each { + my ($self, $url, $eml_cb, @args) = @_; + my $uri = PublicInbox::URIimap->new($url); + my $sec = uri_section($uri); + my $mic_arg = $self->{mic_arg}->{$sec} or + die "BUG: no Mail::IMAPClient->new arg for $sec"; + local $0 = $uri->mailbox." $sec"; + my $cb_name = $mic_arg->{Authcallback}; + if (ref($cb_name) ne 'CODE') { + $mic_arg->{Authcallback} = $self->can($cb_name); + } + my $mic = PublicInbox::IMAPClient->new(%$mic_arg, Debug => 0); + my $err; + if ($mic && $mic->IsConnected) { + local $self->{eml_each} = [ $eml_cb, @args ]; + $err = _imap_fetch_all($self, $mic, $url); + } else { + $err = "E: not connected: $!"; + } + $mic; +} + sub new { bless {}, shift }; 1; diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm index c5070cfd..3eb08e9f 100644 --- a/lib/PublicInbox/TestCommon.pm +++ b/lib/PublicInbox/TestCommon.pm @@ -462,10 +462,15 @@ our $lei = sub { sub lei (@) { $lei->(@_) } sub lei_ok (@) { - my $msg = ref($_[-1]) ? pop(@_) : undef; + my $msg = ref($_[-1]) eq 'SCALAR' ? pop(@_) : undef; + my $tmpdir = quotemeta(File::Spec->tmpdir); # filter out anything that looks like a path name for consistent logs - my @msg = grep(!m!\A/!, @_); - ok($lei->(@_), "lei @msg". ($msg ? " ($$msg)" : '')); + my @msg = ref($_[0]) eq 'ARRAY' ? @{$_[0]} : @_; + for (@msg) { + s!\A([a-z0-9]+://)[^/]+/!$1\$HOST_PORT/! || + s!$tmpdir\b/(?:[^/]+/)?!\$TMPDIR/!; + } + ok(lei(@_), "lei @msg". ($msg ? " ($$msg)" : '')) or diag $lei_err; } sub json_utf8 () { diff --git a/t/lei-convert.t b/t/lei-convert.t new file mode 100644 index 00000000..f58a0a80 --- /dev/null +++ b/t/lei-convert.t @@ -0,0 +1,71 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; use v5.10.1; use PublicInbox::TestCommon; +use PublicInbox::MboxReader; +use PublicInbox::MdirReader; +use PublicInbox::NetReader; +require_git 2.6; +require_mods(qw(DBD::SQLite Search::Xapian)); +my ($tmpdir, $for_destroy) = tmpdir; +my $sock = tcp_server; +my $cmd = [ '-imapd', '-W0', "--stdout=$tmpdir/1", "--stderr=$tmpdir/2" ]; +my ($ro_home, $cfg_path) = setup_public_inboxes; +my $env = { PI_CONFIG => $cfg_path }; +my $td = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT("-imapd: $?"); +my $host_port = tcp_host_port($sock); +undef $sock; +test_lei({ tmpdir => $tmpdir }, sub { + my $d = $ENV{HOME}; + my $dig = Digest::SHA->new(256); + lei_ok('convert', '-o', "mboxrd:$d/foo.mboxrd", + "imap://$host_port/t.v2.0"); + ok(-f "$d/foo.mboxrd", 'mboxrd created'); + my (@mboxrd, @mboxcl2); + open my $fh, '<', "$d/foo.mboxrd" or BAIL_OUT $!; + PublicInbox::MboxReader->mboxrd($fh, sub { push @mboxrd, shift }); + ok(scalar(@mboxrd) > 1, 'got multiple messages'); + + lei_ok('convert', '-o', "mboxcl2:$d/cl2", "mboxrd:$d/foo.mboxrd"); + ok(-s "$d/cl2", 'mboxcl2 non-empty') or diag $lei_err; + open $fh, '<', "$d/cl2" or BAIL_OUT $!; + PublicInbox::MboxReader->mboxcl2($fh, sub { + my $eml = shift; + $eml->header_set($_) for (qw(Content-Length Lines)); + push @mboxcl2, $eml; + }); + is_deeply(\@mboxcl2, \@mboxrd, 'mboxrd and mboxcl2 have same mail'); + + lei_ok('convert', '-o', "$d/md", "mboxrd:$d/foo.mboxrd"); + ok(-d "$d/md", 'Maildir created'); + my @md; + PublicInbox::MdirReader::maildir_each_eml("$d/md", sub { + push @md, $_[1]; + }); + is(scalar(@md), scalar(@mboxrd), 'got expected emails in Maildir'); + @md = sort { ${$a->{bdy}} cmp ${$b->{bdy}} } @md; + @mboxrd = sort { ${$a->{bdy}} cmp ${$b->{bdy}} } @mboxrd; + my @rd_nostatus = map { + my $eml = PublicInbox::Eml->new(\($_->as_string)); + $eml->header_set('Status'); + $eml; + } @mboxrd; + is_deeply(\@md, \@rd_nostatus, 'Maildir output matches mboxrd'); + + my @bar; + lei_ok('convert', '-o', "mboxrd:$d/bar.mboxrd", "$d/md"); + open $fh, '<', "$d/bar.mboxrd" or BAIL_OUT $!; + PublicInbox::MboxReader->mboxrd($fh, sub { push @bar, shift }); + @bar = sort { ${$a->{bdy}} cmp ${$b->{bdy}} } @bar; + is_deeply(\@mboxrd, \@bar, + 'mboxrd round-tripped through Maildir w/ flags'); + + open my $in, '<', "$d/foo.mboxrd" or BAIL_OUT; + my $rdr = { 0 => $in, 1 => \(my $out), 2 => \$lei_err }; + lei_ok([qw(convert --stdin -F mboxrd -o mboxrd:/dev/stdout)], + undef, $rdr); + open $fh, '<', "$d/foo.mboxrd" or BAIL_OUT; + my $exp = do { local $/; <$fh> }; + is($out, $exp, 'stdin => stdout'); +}); +done_testing; diff --git a/t/net_reader-imap.t b/t/net_reader-imap.t new file mode 100644 index 00000000..eea8b0fd --- /dev/null +++ b/t/net_reader-imap.t @@ -0,0 +1,40 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; use v5.10.1; use PublicInbox::TestCommon; +require_git 2.6; +require_mods(qw(DBD::SQLite Search::Xapian)); +my ($tmpdir, $for_destroy) = tmpdir; +my ($ro_home, $cfg_path) = setup_public_inboxes; +my $cmd = [ '-imapd', '-W0', "--stdout=$tmpdir/1", "--stderr=$tmpdir/2" ]; +my $sock = tcp_server; +my $env = { PI_CONFIG => $cfg_path }; +my $td = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT "-imapd: $?"; +my ($host, $port) = tcp_host_port $sock; +require_ok 'PublicInbox::NetReader'; +my $nrd = PublicInbox::NetReader->new; +$nrd->add_url(my $url = "imap://$host:$port/t.v2.0"); +is($nrd->errors, undef, 'no errors'); +$nrd->{pi_cfg} = PublicInbox::Config->new($cfg_path); +$nrd->imap_common_init; +$nrd->{quiet} = 1; +my (%eml, %urls, %args, $nr, @w); +local $SIG{__WARN__} = sub { push(@w, @_) }; +$nrd->imap_each($url, sub { + my ($u, $uid, $kw, $eml, $arg) = @_; + ++$urls{$u}; + ++$args{$arg}; + like($uid, qr/\A[0-9]+\z/, 'got digit UID '.$uid); + ++$eml{ref($eml)}; + ++$nr; +}, 'blah'); +is(scalar(@w), 0, 'no warnings'); +ok($nr, 'got some emails'); +is($eml{'PublicInbox::Eml'}, $nr, 'got expected Eml objects'); +is(scalar keys %eml, 1, 'only got Eml objects'); +is($urls{$url}, $nr, 'one URL expected number of times'); +is(scalar keys %urls, 1, 'only got one URL'); +is($args{blah}, $nr, 'got arg expected number of times'); +is(scalar keys %args, 1, 'only got one arg'); + +done_testing;
This makes "lei import" more similar to "lei convert" and allows importing from disparate sources simultaneously. We'll also fix some ->child_error usage errors and make the style of the code more similar to the "lei convert" code. v2: fix missing requires --- MANIFEST | 1 + lib/PublicInbox/LeiImport.pm | 129 ++++++++++++++++++++++++----------- t/lei-import-imap.t | 28 ++++++++ t/lei-import-maildir.t | 4 +- t/lei_to_mail.t | 10 +++ 5 files changed, 130 insertions(+), 42 deletions(-) create mode 100644 t/lei-import-imap.t diff --git a/MANIFEST b/MANIFEST index 4f146771..19f73356 100644 --- a/MANIFEST +++ b/MANIFEST @@ -365,6 +365,7 @@ t/kqnotify.t t/lei-convert.t t/lei-daemon.t t/lei-externals.t +t/lei-import-imap.t t/lei-import-maildir.t t/lei-import.t t/lei-mirror.t diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 32f3a467..62a2a412 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -29,7 +29,7 @@ sub import_done { # EOF callback for main daemon $imp->wq_wait_old(\&import_done_wait, $lei); } -sub do_import { +sub import_start { my ($lei) = @_; my $ops = { '!' => [ $lei->can('fail_handler'), $lei ], @@ -39,7 +39,7 @@ sub do_import { }; ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); my $self = $lei->{imp}; - my $j = $lei->{opt}->{jobs} // scalar(@{$self->{argv}}) || 1; + my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1; if (my $nrd = $lei->{nrd}) { # $j = $nrd->net_concurrency($j); TODO } else { @@ -50,8 +50,8 @@ sub do_import { my $op = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; $self->wq_io_do('import_stdin', []) if $self->{0}; - for my $x (@{$self->{argv}}) { - $self->wq_io_do('import_path_url', [], $x); + for my $input (@{$self->{inputs}}) { + $self->wq_io_do('import_path_url', [], $input); } $self->wq_close(1); $lei->event_step_init; # wait for shutdowns @@ -61,60 +61,91 @@ sub do_import { } sub call { # the main "lei import" method - my ($cls, $lei, @argv) = @_; + my ($cls, $lei, @inputs) = @_; my $sto = $lei->_lei_store(1); $sto->write_prepare($lei); + my ($nrd, @f, @d); $lei->{opt}->{kw} //= 1; - my $self = $lei->{imp} = bless { argv => \@argv }, $cls; + my $self = $lei->{imp} = bless { inputs => \@inputs }, $cls; if ($lei->{opt}->{stdin}) { - @argv and return - $lei->fail("--stdin and locations (@argv) do not mix"); + @inputs and return $lei->fail("--stdin and @inputs do not mix"); $lei->check_input_format or return; $self->{0} = $lei->{0}; - } else { - my @f; - for my $x (@argv) { - if (-f $x) { push @f, $x } - elsif (-d _) { require PublicInbox::MdirReader } - else { - require PublicInbox::NetReader; - $lei->{nrd} //= PublicInbox::NetReader->new; - $lei->{nrd}->add_url($x); + } + + # TODO: do we need --format for non-stdin? + my $fmt = $lei->{opt}->{'format'}; + # e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX + for my $input (@inputs) { + my $input_path = $input; + if ($input =~ m!\A(?:imap|nntp)s?://!i) { + require PublicInbox::NetReader; + $nrd //= PublicInbox::NetReader->new; + $nrd->add_url($input); + } elsif ($input_path =~ s/\A([a-z0-9]+)://is) { + my $ifmt = lc $1; + if (($fmt // $ifmt) ne $ifmt) { + return $lei->fail(<<""); +--format=$fmt and `$ifmt:' conflict + } - } - if (@f) { $lei->check_input_format(\@f) or return } - if ($lei->{nrd} && (my @err = $lei->{nrd}->errors)) { - return $lei->fail(@err); - } + if (-f $input_path) { + require PublicInbox::MboxReader; + PublicInbox::MboxReader->can($ifmt) or return + $lei->fail("$ifmt not supported"); + } elsif (-d _) { + require PublicInbox::MdirReader; + $ifmt eq 'maildir' or return + $lei->fail("$ifmt not supported"); + } else { + return $lei->fail("Unable to handle $input"); + } + } elsif (-f $input) { push @f, $input + } elsif (-d _) { push @d, $input + } else { return $lei->fail("Unable to handle $input") } } - do_import($lei); + if (@f) { $lei->check_input_format(\@f) or return } + if (@d) { # TODO: check for MH vs Maildir, here + require PublicInbox::MdirReader; + } + $self->{inputs} = \@inputs; + return import_start($lei) if !$nrd; + + if (my $err = $nrd->errors) { + return $lei->fail($err); + } + $nrd->{quiet} = $lei->{opt}->{quiet}; + $lei->{nrd} = $nrd; + require PublicInbox::LeiAuth; + my $auth = $lei->{auth} = PublicInbox::LeiAuth->new($nrd); + $auth->auth_start($lei, \&import_start, $lei); } sub ipc_atfork_child { my ($self) = @_; + delete $self->{lei}->{imp}; # drop circular ref $self->{lei}->lei_atfork_child; $self->SUPER::ipc_atfork_child; } sub _import_fh { - my ($lei, $fh, $x) = @_; + my ($lei, $fh, $input, $ifmt) = @_; my $set_kw = $lei->{opt}->{kw}; - my $fmt = $lei->{opt}->{'format'}; eval { - if ($fmt eq 'eml') { + if ($ifmt eq 'eml') { my $buf = do { local $/; <$fh> } // - return $lei->child_error(1 >> 8, <<""); -error reading $x: $! + return $lei->child_error(1 << 8, <<""); +error reading $input: $! my $eml = PublicInbox::Eml->new(\$buf); _import_eml($eml, $lei->{sto}, $set_kw); } else { # some mbox (->can already checked in call); - my $cb = PublicInbox::MboxReader->can($fmt) // - die "BUG: bad fmt=$fmt"; + my $cb = PublicInbox::MboxReader->can($ifmt) // + die "BUG: bad fmt=$ifmt"; $cb->(undef, $fh, \&_import_eml, $lei->{sto}, $set_kw); } }; - $lei->child_error(1 >> 8, "<stdin>: $@") if $@; + $lei->child_error(1 << 8, "<stdin>: $@") if $@; } sub _import_maildir { # maildir_each_file cb @@ -122,27 +153,45 @@ sub _import_maildir { # maildir_each_file cb $sto->ipc_do('set_eml_from_maildir', $f, $set_kw); } +sub _import_imap { # imap_each cb + my ($url, $uid, $kw, $eml, $sto, $set_kw) = @_; + warn "$url $uid"; + $sto->ipc_do('set_eml', $eml, $set_kw ? @$kw : ()); +} + sub import_path_url { - my ($self, $x) = @_; + my ($self, $input) = @_; my $lei = $self->{lei}; + my $ifmt = lc($lei->{opt}->{'format'} // ''); # TODO auto-detect? - if (-f $x) { - open my $fh, '<', $x or return $lei->child_error(1 >> 8, <<""); -unable to open $x: $! + if ($input =~ m!\A(imap|nntp)s?://!i) { + $lei->{nrd}->imap_each($input, \&_import_imap, $lei->{sto}, + $lei->{opt}->{kw}); + return; + } elsif ($input =~ s!\A([a-z0-9]+):!!i) { + $ifmt = lc $1; + } + if (-f $input) { + open my $fh, '<', $input or return $lei->child_error(1 << 8, <<""); +unable to open $input: $! - _import_fh($lei, $fh, $x); - } elsif (-d _ && (-d "$x/cur" || -d "$x/new")) { - PublicInbox::MdirReader::maildir_each_file($x, + _import_fh($lei, $fh, $input, $ifmt); + } elsif (-d _ && (-d "$input/cur" || -d "$input/new")) { + return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir'; +$input appears to a be a maildir, not $ifmt +EOM + PublicInbox::MdirReader::maildir_each_file($input, \&_import_maildir, $lei->{sto}, $lei->{opt}->{kw}); } else { - $lei->fail("$x unsupported (TODO)"); + $lei->fail("$input unsupported (TODO)"); } } sub import_stdin { my ($self) = @_; - _import_fh($self->{lei}, $self->{0}, '<stdin>'); + my $lei = $self->{lei}; + _import_fh($lei, delete $self->{0}, '<stdin>', $lei->{opt}->{'format'}); } 1; diff --git a/t/lei-import-imap.t b/t/lei-import-imap.t new file mode 100644 index 00000000..ee308723 --- /dev/null +++ b/t/lei-import-imap.t @@ -0,0 +1,28 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; use v5.10.1; use PublicInbox::TestCommon; +require_git 2.6; +require_mods(qw(DBD::SQLite Search::Xapian)); +my ($ro_home, $cfg_path) = setup_public_inboxes; +my ($tmpdir, $for_destroy) = tmpdir; +my $sock = tcp_server; +my $cmd = [ '-imapd', '-W0', "--stdout=$tmpdir/1", "--stderr=$tmpdir/2" ]; +my $env = { PI_CONFIG => $cfg_path }; +my $td = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT("-imapd: $?"); +my $host_port = tcp_host_port($sock); +undef $sock; +test_lei({ tmpdir => $tmpdir }, sub { + lei_ok(qw(q bytes:1..)); + my $out = json_utf8->decode($lei_out); + is_deeply($out, [ undef ], 'nothing imported, yet'); + lei_ok('import', "imap://$host_port/t.v2.0"); + lei_ok(qw(q bytes:1..)); + $out = json_utf8->decode($lei_out); + ok(scalar(@$out) > 1, 'got imported messages'); + is(pop @$out, undef, 'trailing JSON null element was null'); + my %r; + for (@$out) { $r{ref($_)}++ } + is_deeply(\%r, { 'HASH' => scalar(@$out) }, 'all hashes'); +}); +done_testing; diff --git a/t/lei-import-maildir.t b/t/lei-import-maildir.t index 5842e19e..d2b059ad 100644 --- a/t/lei-import-maildir.t +++ b/t/lei-import-maildir.t @@ -23,8 +23,8 @@ test_lei(sub { is_deeply($r2, $res, 'idempotent import'); rename("$md/cur/x:2,S", "$md/cur/x:2,SR") or BAIL_OUT "rename: $!"; - ok($lei->(qw(import), $md), 'import Maildir after +answered'); - ok($lei->(qw(q -d none s:boolean)), 'lei q after +answered'); + lei_ok('import', "maildir:$md", \'import Maildir after +answered'); + lei_ok(qw(q -d none s:boolean), \'lei q after +answered'); $res = json_utf8->decode($lei_out); like($res->[0]->{'s'}, qr/use boolean/, 'got expected result'); is_deeply($res->[0]->{kw}, ['answered', 'seen'], 'keywords set'); diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t index 6a571660..72b90700 100644 --- a/t/lei_to_mail.t +++ b/t/lei_to_mail.t @@ -139,6 +139,16 @@ test_lei(sub { is($res->[1], undef, 'only one result'); }); +test_lei(sub { + lei_ok('import', "$mbox:$fn", \'imported mbox:/path') or diag $lei_err; + lei_ok(qw(q s:x), \'lei q works') or diag $lei_err; + my $res = json_utf8->decode($lei_out); + my $x = $res->[0]; + is($x->{'s'}, 'x', 'subject imported') or diag $lei_out; + is_deeply($x->{'kw'}, ['seen'], 'kw imported') or diag $lei_out; + is($res->[1], undef, 'only one result'); +}); + for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? my $zsfx2cmd = PublicInbox::LeiToMail->can('zsfx2cmd'); SKIP: {
The backends for "lei add-external --mirror", "lei convert", and "lei import" all share a similar pattern for spawning background workers. Hoist out the common parts to slim down our code base a bit. The LeiXSearch and LeiToMail workers for "lei q" remains a the odd duck due to the deep pipelining and parallelization. --- lib/PublicInbox/LEI.pm | 19 +++++++++++++++++++ lib/PublicInbox/LeiAuth.pm | 17 +++-------------- lib/PublicInbox/LeiConvert.pm | 22 +++++----------------- lib/PublicInbox/LeiImport.pm | 19 ++++--------------- lib/PublicInbox/LeiMirror.pm | 19 ++++--------------- 5 files changed, 35 insertions(+), 61 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 1e4c36d0..0b4bc20e 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -468,6 +468,25 @@ sub lei_atfork_child { $current_lei = $persist ? undef : $self; # for SIG{__WARN__} } +sub workers_start { + my ($lei, $wq, $ident, $jobs, $ops) = @_; + $ops = { + '!' => [ $lei->can('fail_handler'), $lei ], + '|' => [ $lei->can('sigpipe_handler'), $lei ], + 'x_it' => [ $lei->can('x_it'), $lei ], + 'child_error' => [ $lei->can('child_error'), $lei ], + %$ops + }; + require PublicInbox::PktOp; + ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); + $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei }); + delete $lei->{pkt_op_p}; + my $op = delete $lei->{pkt_op_c}; + $lei->event_step_init; + # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op + $lei->{oneshot} ? $op : undef; +} + sub _help { require PublicInbox::LeiHelp; PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC); diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm index 88310874..7210af99 100644 --- a/lib/PublicInbox/LeiAuth.pm +++ b/lib/PublicInbox/LeiAuth.pm @@ -42,24 +42,13 @@ sub auth_eof { sub auth_start { my ($self, $lei, $post_auth_cb, @args) = @_; - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - '|' => [ $lei->can('sigpipe_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], + my $op = $lei->workers_start($self, 'auth', 1, { 'nrd_merge' => [ \&nrd_merge, $lei ], '' => [ \&auth_eof, $lei, $post_auth_cb, @args ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); - $self->wq_workers_start('lei_auth', 1, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + }); $self->wq_io_do('do_auth', []); $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm index 78fd5e17..ba375772 100644 --- a/lib/PublicInbox/LeiConvert.pm +++ b/lib/PublicInbox/LeiConvert.pm @@ -8,7 +8,6 @@ use v5.10.1; use parent qw(PublicInbox::IPC); use PublicInbox::Eml; use PublicInbox::InboxWritable qw(eml_from_path); -use PublicInbox::PktOp; use PublicInbox::LeiStore; use PublicInbox::LeiOverview; @@ -59,26 +58,15 @@ sub do_convert { # via wq_do delete $self->{wcb}; # commit } -sub convert_start { +sub convert_start { # LeiAuth->auth_start callback my ($lei) = @_; - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - '|' => [ $lei->can('sigpipe_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], - '' => [ $lei->can('dclose'), $lei ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); my $self = $lei->{cnv}; - $self->wq_workers_start('lei_convert', 1, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + my $op = $lei->workers_start($self, 'lei_convert', 1, { + '' => [ $lei->can('dclose'), $lei ] + }); $self->wq_io_do('do_convert', []); $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub call { # the main "lei convert" method diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 62a2a412..68cab12c 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -8,7 +8,6 @@ use v5.10.1; use parent qw(PublicInbox::IPC); use PublicInbox::Eml; use PublicInbox::InboxWritable qw(eml_from_path); -use PublicInbox::PktOp; sub _import_eml { # MboxReader callback my ($eml, $sto, $set_kw) = @_; @@ -31,13 +30,6 @@ sub import_done { # EOF callback for main daemon sub import_start { my ($lei) = @_; - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], - '' => [ \&import_done, $lei ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); my $self = $lei->{imp}; my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1; if (my $nrd = $lei->{nrd}) { @@ -46,18 +38,15 @@ sub import_start { my $nproc = $self->detect_nproc; $j = $nproc if $j > $nproc; } - $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + my $op = $lei->workers_start($self, 'lei_import', $j, { + '' => [ \&import_done, $lei ], + }); $self->wq_io_do('import_stdin', []) if $self->{0}; for my $input (@{$self->{inputs}}) { $self->wq_io_do('import_path_url', [], $input); } $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub call { # the main "lei import" method diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index c5153148..f8ca1ee5 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -8,7 +8,6 @@ use v5.10.1; use parent qw(PublicInbox::IPC); use IO::Uncompress::Gunzip qw(gunzip $GunzipError); use PublicInbox::Spawn qw(popen_rd spawn); -use PublicInbox::PktOp; sub do_finish_mirror { # dwaitpid callback my ($arg, $pid) = @_; @@ -279,22 +278,12 @@ sub start { require PublicInbox::Inbox; require PublicInbox::Admin; require PublicInbox::InboxWritable; - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], - '' => [ \&mirror_done, $lei ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); - $self->wq_workers_start('lei_mirror', 1, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + my $op = $lei->workers_start($self, 'lei_mirror', 1, { + '' => [ \&mirror_done, $lei ] + }); $self->wq_io_do('do_mirror', []); $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub ipc_atfork_child {
We need to ensure authentication failures and error codes get propagated to the parent process(es) properly. v2: update MANIFEST --- MANIFEST | 1 + lib/PublicInbox/LeiAuth.pm | 1 + lib/PublicInbox/NetReader.pm | 3 +++ xt/lei-auth-fail.t | 20 ++++++++++++++++++++ 4 files changed, 25 insertions(+) create mode 100644 xt/lei-auth-fail.t diff --git a/MANIFEST b/MANIFEST index 19f73356..3d9ad616 100644 --- a/MANIFEST +++ b/MANIFEST @@ -466,6 +466,7 @@ xt/git_async_cmp.t xt/httpd-async-stream.t xt/imapd-mbsync-oimap.t xt/imapd-validate.t +xt/lei-auth-fail.t xt/lei-sigpipe.t xt/mem-imapd-tls.t xt/mem-msgview.t diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm index 7210af99..7acb9900 100644 --- a/lib/PublicInbox/LeiAuth.pm +++ b/lib/PublicInbox/LeiAuth.pm @@ -42,6 +42,7 @@ sub auth_eof { sub auth_start { my ($self, $lei, $post_auth_cb, @args) = @_; + $lei->_lei_cfg(1); # workers may need to read config my $op = $lei->workers_start($self, 'auth', 1, { 'nrd_merge' => [ \&nrd_merge, $lei ], '' => [ \&auth_eof, $lei, $post_auth_cb, @args ], diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm index ad8c18d0..61ea538b 100644 --- a/lib/PublicInbox/NetReader.pm +++ b/lib/PublicInbox/NetReader.pm @@ -89,6 +89,9 @@ sub mic_for { # mic = Mail::IMAPClient $self->{mic_arg}->{uri_section($uri)} = $mic_arg; } else { $err = "E: <$url> LOGIN: $@\n"; + if ($cred && defined($cred->{password})) { + $err =~ s/\Q$cred->{password}\E/*******/g; + } $mic = undef; } $cred->run($mic ? 'approve' : 'reject') if $cred; diff --git a/xt/lei-auth-fail.t b/xt/lei-auth-fail.t new file mode 100644 index 00000000..5308d0f9 --- /dev/null +++ b/xt/lei-auth-fail.t @@ -0,0 +1,20 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; use v5.10.1; use PublicInbox::TestCommon; + +# TODO: mock IMAP server which fails at authentication so we don't +# have to make external connections to test this: +my $imap_fail = $ENV{TEST_LEI_IMAP_FAIL_URL} // + 'imaps://AzureDiamond:Hunter2@public-inbox.org:994/INBOX'; +test_lei(sub { + ok(!lei(qw(convert -o mboxrd:/dev/stdout), $imap_fail), + 'IMAP auth failure on convert'); + like($lei_err, qr!\bE:.*?imaps://.*?!sm, 'error shown'); + unlike($lei_err, qr!Hunter2!s, 'password not shown'); + is($lei_out, '', 'nothing output'); + ok(!lei(qw(import), $imap_fail), 'IMAP auth failure on import'); + like($lei_err, qr!\bE:.*?imaps://.*?!sm, 'error shown'); + unlike($lei_err, qr!Hunter2!s, 'password not shown'); +}); +done_testing;
Fixed to setup ->_lei_cfg at LeiAuth->auth_start in PATCH 1/4 instead of 4/4. This fixes failures on my FreeBSD 11.x VM where 1/4 alone was failing (I never caught this on Debian 10.x). Eric Wong (4): lei convert: mail format conversion sub-command lei import: add IMAP and (maildir|mbox*):$PATHNAME support lei: consolidate the bulk of the IPC code lei: check for IMAP auth errors MANIFEST | 6 ++ lib/PublicInbox/GitCredential.pm | 18 ++-- lib/PublicInbox/LEI.pm | 57 +++++++++-- lib/PublicInbox/LeiAuth.pm | 70 +++++++++++++ lib/PublicInbox/LeiConvert.pm | 148 +++++++++++++++++++++++++++ lib/PublicInbox/LeiDedupe.pm | 2 +- lib/PublicInbox/LeiImport.pm | 148 +++++++++++++++++---------- lib/PublicInbox/LeiMirror.pm | 19 +--- lib/PublicInbox/LeiOverview.pm | 7 +- lib/PublicInbox/LeiToMail.pm | 5 +- lib/PublicInbox/MdirReader.pm | 26 +++++ lib/PublicInbox/NetReader.pm | 166 ++++++++++++++++++++++++++++--- lib/PublicInbox/TestCommon.pm | 11 +- t/lei-convert.t | 71 +++++++++++++ t/lei-import-imap.t | 28 ++++++ t/lei-import-maildir.t | 4 +- t/lei_to_mail.t | 10 ++ t/net_reader-imap.t | 40 ++++++++ xt/lei-auth-fail.t | 20 ++++ 19 files changed, 747 insertions(+), 109 deletions(-) create mode 100644 lib/PublicInbox/LeiAuth.pm create mode 100644 lib/PublicInbox/LeiConvert.pm create mode 100644 t/lei-convert.t create mode 100644 t/lei-import-imap.t create mode 100644 t/net_reader-imap.t create mode 100644 xt/lei-auth-fail.t
This will make testing IMAP support for other commands easier, as it doesn't write to lei/store at all. Like the pager and MUA, "git credential" is always spawned by script/lei (and not lei-daemon) so it has a controlling terminal for password prompts. v2: fix missing requires, correct test ordering v3: ensure config exists for IMAP auth --- MANIFEST | 4 + lib/PublicInbox/GitCredential.pm | 18 ++-- lib/PublicInbox/LEI.pm | 38 +++++-- lib/PublicInbox/LeiAuth.pm | 81 +++++++++++++++ lib/PublicInbox/LeiConvert.pm | 160 ++++++++++++++++++++++++++++++ lib/PublicInbox/LeiDedupe.pm | 2 +- lib/PublicInbox/LeiOverview.pm | 7 +- lib/PublicInbox/LeiToMail.pm | 5 +- lib/PublicInbox/MdirReader.pm | 26 +++++ lib/PublicInbox/NetReader.pm | 163 ++++++++++++++++++++++++++++--- lib/PublicInbox/TestCommon.pm | 11 ++- t/lei-convert.t | 71 ++++++++++++++ t/net_reader-imap.t | 40 ++++++++ 13 files changed, 589 insertions(+), 37 deletions(-) create mode 100644 lib/PublicInbox/LeiAuth.pm create mode 100644 lib/PublicInbox/LeiConvert.pm create mode 100644 t/lei-convert.t create mode 100644 t/net_reader-imap.t diff --git a/MANIFEST b/MANIFEST index 82068900..4f146771 100644 --- a/MANIFEST +++ b/MANIFEST @@ -178,6 +178,8 @@ lib/PublicInbox/InputPipe.pm lib/PublicInbox/Isearch.pm lib/PublicInbox/KQNotify.pm lib/PublicInbox/LEI.pm +lib/PublicInbox/LeiAuth.pm +lib/PublicInbox/LeiConvert.pm lib/PublicInbox/LeiCurl.pm lib/PublicInbox/LeiDedupe.pm lib/PublicInbox/LeiExternal.pm @@ -360,6 +362,7 @@ t/init.t t/ipc.t t/iso-2202-jp.eml t/kqnotify.t +t/lei-convert.t t/lei-daemon.t t/lei-externals.t t/lei-import-maildir.t @@ -388,6 +391,7 @@ t/msg_iter.t t/msgmap.t t/msgtime.t t/multi-mid.t +t/net_reader-imap.t t/nntp.t t/nntpd-tls.t t/nntpd-v2.t diff --git a/lib/PublicInbox/GitCredential.pm b/lib/PublicInbox/GitCredential.pm index 9e193029..2d81817c 100644 --- a/lib/PublicInbox/GitCredential.pm +++ b/lib/PublicInbox/GitCredential.pm @@ -4,11 +4,17 @@ package PublicInbox::GitCredential; use strict; use PublicInbox::Spawn qw(popen_rd); -sub run ($$) { - my ($self, $op) = @_; - my ($in_r, $in_w); +sub run ($$;$) { + my ($self, $op, $lei) = @_; + my ($in_r, $in_w, $out_r); + my $cmd = [ qw(git credential), $op ]; pipe($in_r, $in_w) or die "pipe: $!"; - my $out_r = popen_rd([qw(git credential), $op], undef, { 0 => $in_r }); + if ($lei && !$lei->{oneshot}) { # we'll die if disconnected: + pipe($out_r, my $out_w) or die "pipe: $!"; + $lei->send_exec_cmd([ $in_r, $out_w ], $cmd, {}); + } else { + $out_r = popen_rd($cmd, undef, { 0 => $in_r }); + } close $in_r or die "close in_r: $!"; my $out = ''; @@ -41,8 +47,8 @@ sub check_netrc ($) { } sub fill { - my ($self) = @_; - my $out_r = run($self, 'fill'); + my ($self, $lei) = @_; + my $out_r = run($self, 'fill', $lei); while (<$out_r>) { chomp; return if $_ eq ''; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 1fa9f751..1e4c36d0 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -173,7 +173,11 @@ our %CMD = ( # sorted in order of importance/use: qw(stdin| offset=i recursive|r exclude=s include|I=s format|f=s kw|keywords|flags!), ], - +'convert' => [ 'LOCATION...|--stdin', + 'one-time conversion from URL or filesystem to another format', + qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s quiet|q + kw|keywords|flags!), + ], 'config' => [ '[...]', sub { 'git-config(1) wrapper for '._config_path($_[0]); }, qw(config-file|system|global|file|f=s), # for conflict detection @@ -320,7 +324,7 @@ my %CONFIG_KEYS = ( 'leistore.dir' => 'top-level storage location', ); -my @WQ_KEYS = qw(lxs l2m imp mrr); # internal workers +my @WQ_KEYS = qw(lxs l2m imp mrr cnv auth); # internal workers # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE sub x_it ($$) { @@ -391,18 +395,19 @@ sub fail ($$;$) { undef; } -sub check_input_format ($;$) { - my ($self, $files) = @_; - my $fmt = $self->{opt}->{'format'}; +sub check_input_format ($;$$) { + my ($self, $files, $opt_key) = @_; + $opt_key //= 'format'; + my $fmt = $self->{opt}->{$opt_key}; if (!$fmt) { my $err = $files ? "regular file(s):\n@$files" : '--stdin'; - return fail($self, "--format unset for $err"); + return fail($self, "--$opt_key unset for $err"); } return 1 if $fmt eq 'eml'; # XXX: should this handle {gz,bz2,xz}? that's currently in LeiToMail require PublicInbox::MboxReader; PublicInbox::MboxReader->can($fmt) || - fail($self, "--format=$fmt unrecognized"); + fail($self, "--$opt_key=$fmt unrecognized"); } sub out ($;@) { @@ -445,6 +450,7 @@ sub lei_atfork_child { } else { delete $self->{0}; } + delete @$self{qw(cnv)}; for (delete @$self{qw(3 sock old_1 au_done)}) { close($_) if defined($_); } @@ -626,6 +632,11 @@ sub lei_import { PublicInbox::LeiImport->call(@_); } +sub lei_convert { + require PublicInbox::LeiConvert; + PublicInbox::LeiConvert->call(@_); +} + sub lei_init { my ($self, $dir) = @_; my $cfg = _lei_cfg($self, 1); @@ -770,6 +781,13 @@ sub start_mua { delete $self->{opt}->{verbose}; } +sub send_exec_cmd { # tell script/lei to execute a command + my ($self, $io, $cmd, $env) = @_; + my $sock = $self->{sock} // die 'lei client gone'; + my $fds = [ map { fileno($_) } @$io ]; + $send_cmd->($sock, $fds, exec_buf($cmd, $env), MSG_EOR); +} + sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail my ($self) = @_; my $alerts = $self->{opt}->{alert} // return; @@ -813,10 +831,9 @@ sub start_pager { pipe(my ($r, $wpager)) or return warn "pipe: $!"; my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} }; my $pgr = [ undef, @$rdr{1, 2} ]; - if (my $sock = $self->{sock}) { # lei(1) process runs it + if ($self->{sock}) { # lei(1) process runs it delete @$new_env{keys %$env}; # only set iff unset - my $fds = [ map { fileno($_) } @$rdr{0..2} ]; - $send_cmd->($sock, $fds, exec_buf([$pager], $new_env), MSG_EOR); + send_exec_cmd($self, [ @$rdr{0..2} ], [$pager], $new_env); } elsif ($self->{oneshot}) { my $cmd = [$pager]; $self->{"pid.$self.$$"}->{spawn($cmd, $new_env, $rdr)} = $cmd; @@ -920,6 +937,7 @@ sub event_step { sub event_step_init { my ($self) = @_; + return if $self->{-event_init_done}++; if (my $sock = $self->{sock}) { # using DS->EventLoop $self->SUPER::new($sock, EPOLLIN|EPOLLET); } diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm new file mode 100644 index 00000000..6593ba51 --- /dev/null +++ b/lib/PublicInbox/LeiAuth.pm @@ -0,0 +1,81 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# Authentication worker for anything that needs auth for read/write IMAP +# (eventually for read-only NNTP access) +package PublicInbox::LeiAuth; +use strict; +use v5.10.1; +use parent qw(PublicInbox::IPC); +use PublicInbox::PktOp qw(pkt_do); +use PublicInbox::NetReader; + +sub nrd_merge { + my ($lei, $nrd_new) = @_; + if ($lei->{pkt_op_p}) { # from lei_convert worker + pkt_do($lei->{pkt_op_p}, 'nrd_merge', $nrd_new); + } else { # single lei-daemon consumer + my $self = $lei->{auth} or return; # client disconnected + my $nrd = $self->{nrd}; + %$nrd = (%$nrd, %$nrd_new); + } +} + +sub do_auth { # called via wq_io_do + my ($self) = @_; + my ($lei, $nrd) = @$self{qw(lei nrd)}; + $nrd->imap_common_init($lei); + nrd_merge($lei, $nrd); # tell lei-daemon updated auth info +} + +sub do_finish_auth { # dwaitpid callback + my ($arg, $pid) = @_; + my ($self, $lei, $post_auth_cb, @args) = @$arg; + $? ? $lei->dclose : $post_auth_cb->(@args); +} + +sub auth_eof { + my ($lei, $post_auth_cb, @args) = @_; + my $self = delete $lei->{auth} or return; + $self->wq_wait_old(\&do_finish_auth, $lei, $post_auth_cb, @args); +} + +sub auth_start { + my ($self, $lei, $post_auth_cb, @args) = @_; + $lei->_lei_cfg(1); # workers may need to read config + my $ops = { + '!' => [ $lei->can('fail_handler'), $lei ], + '|' => [ $lei->can('sigpipe_handler'), $lei ], + 'x_it' => [ $lei->can('x_it'), $lei ], + 'child_error' => [ $lei->can('child_error'), $lei ], + 'nrd_merge' => [ \&nrd_merge, $lei ], + '' => [ \&auth_eof, $lei, $post_auth_cb, @args ], + }; + ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); + $self->wq_workers_start('lei_auth', 1, $lei->oldset, {lei => $lei}); + my $op = delete $lei->{pkt_op_c}; + delete $lei->{pkt_op_p}; + $self->wq_io_do('do_auth', []); + $self->wq_close(1); + $lei->event_step_init; # wait for shutdowns + if ($lei->{oneshot}) { + while ($op->{sock}) { $op->event_step } + } +} + +sub ipc_atfork_child { + my ($self) = @_; + # prevent {sock} from being closed in lei_atfork_child: + my $s = delete $self->{lei}->{sock}; + delete $self->{lei}->{auth}; # drop circular ref + $self->{lei}->lei_atfork_child; + $self->{lei}->{sock} = $s if $s; + $self->SUPER::ipc_atfork_child; +} + +sub new { + my ($cls, $nrd) = @_; + bless { nrd => $nrd }, $cls; +} + +1; diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm new file mode 100644 index 00000000..78fd5e17 --- /dev/null +++ b/lib/PublicInbox/LeiConvert.pm @@ -0,0 +1,160 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# front-end for the "lei convert" sub-command +package PublicInbox::LeiConvert; +use strict; +use v5.10.1; +use parent qw(PublicInbox::IPC); +use PublicInbox::Eml; +use PublicInbox::InboxWritable qw(eml_from_path); +use PublicInbox::PktOp; +use PublicInbox::LeiStore; +use PublicInbox::LeiOverview; + +sub mbox_cb { + my ($eml, $self) = @_; + my @kw = PublicInbox::LeiStore::mbox_keywords($eml); + $eml->header_set($_) for qw(Status X-Status); + $self->{wcb}->(undef, { kw => \@kw }, $eml); +} + +sub imap_cb { # ->imap_each + my ($url, $uid, $kw, $eml, $self) = @_; + $self->{wcb}->(undef, { kw => $kw }, $eml); +} + +sub mdir_cb { + my ($kw, $eml, $self) = @_; + $self->{wcb}->(undef, { kw => $kw }, $eml); +} + +sub do_convert { # via wq_do + my ($self) = @_; + my $lei = $self->{lei}; + my $in_fmt = $lei->{opt}->{'in-format'}; + if (my $stdin = delete $self->{0}) { + PublicInbox::MboxReader->$in_fmt($stdin, \&mbox_cb, $self); + } + for my $input (@{$self->{inputs}}) { + my $ifmt = lc($in_fmt // ''); + if ($input =~ m!\A(?:imap|nntp)s?://!) { # TODO: nntp + $lei->{nrd}->imap_each($input, \&imap_cb, $self); + next; + } elsif ($input =~ s!\A([a-z0-9]+):!!i) { + $ifmt = lc $1; + } + if (-f $input) { + open my $fh, '<', $input or + return $lei->fail("open $input: $!"); + PublicInbox::MboxReader->$ifmt($fh, \&mbox_cb, $self); + } elsif (-d _) { + PublicInbox::MdirReader::maildir_each_eml($input, + \&mdir_cb, $self); + } else { + die "BUG: $input unhandled"; # should've failed earlier + } + } + delete $lei->{1}; + delete $self->{wcb}; # commit +} + +sub convert_start { + my ($lei) = @_; + my $ops = { + '!' => [ $lei->can('fail_handler'), $lei ], + '|' => [ $lei->can('sigpipe_handler'), $lei ], + 'x_it' => [ $lei->can('x_it'), $lei ], + 'child_error' => [ $lei->can('child_error'), $lei ], + '' => [ $lei->can('dclose'), $lei ], + }; + ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); + my $self = $lei->{cnv}; + $self->wq_workers_start('lei_convert', 1, $lei->oldset, {lei => $lei}); + my $op = delete $lei->{pkt_op_c}; + delete $lei->{pkt_op_p}; + $self->wq_io_do('do_convert', []); + $self->wq_close(1); + $lei->event_step_init; # wait for shutdowns + if ($lei->{oneshot}) { + while ($op->{sock}) { $op->event_step } + } +} + +sub call { # the main "lei convert" method + my ($cls, $lei, @inputs) = @_; + my $opt = $lei->{opt}; + $opt->{kw} //= 1; + my $self = $lei->{cnv} = bless {}, $cls; + my $in_fmt = $opt->{'in-format'}; + my ($nrd, @f, @d); + $opt->{dedupe} //= 'none'; + my $ovv = PublicInbox::LeiOverview->new($lei, 'out-format'); + $lei->{l2m} or return + $lei->fail("output not specified or is not a mail destination"); + $opt->{augment} = 1 unless $ovv->{dst} eq '/dev/stdout'; + if ($opt->{stdin}) { + @inputs and return $lei->fail("--stdin and @inputs do not mix"); + $lei->check_input_format(undef, 'in-format') or return; + $self->{0} = $lei->{0}; + } + # e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX + for my $input (@inputs) { + my $input_path = $input; + if ($input =~ m!\A(?:imap|nntp)s?://!i) { + require PublicInbox::NetReader; + $nrd //= PublicInbox::NetReader->new; + $nrd->add_url($input); + } elsif ($input_path =~ s/\A([a-z0-9]+)://is) { + my $ifmt = lc $1; + if (($in_fmt // $ifmt) ne $ifmt) { + return $lei->fail(<<""); +--in-format=$in_fmt and `$ifmt:' conflict + + } + if (-f $input_path) { + require PublicInbox::MboxReader; + PublicInbox::MboxReader->can($ifmt) or return + $lei->fail("$ifmt not supported"); + } elsif (-d _) { + require PublicInbox::MdirReader; + $ifmt eq 'maildir' or return + $lei->fail("$ifmt not supported"); + } else { + return $lei->fail("Unable to handle $input"); + } + } elsif (-f $input) { push @f, $input } + elsif (-d _) { push @d, $input } + else { return $lei->fail("Unable to handle $input") } + } + if (@f) { $lei->check_input_format(\@f, 'in-format') or return } + if (@d) { # TODO: check for MH vs Maildir, here + require PublicInbox::MdirReader; + } + $self->{inputs} = \@inputs; + return convert_start($lei) if !$nrd; + + if (my $err = $nrd->errors) { + return $lei->fail($err); + } + $nrd->{quiet} = $opt->{quiet}; + $lei->{nrd} = $nrd; + require PublicInbox::LeiAuth; + my $auth = $lei->{auth} = PublicInbox::LeiAuth->new($nrd); + $auth->auth_start($lei, \&convert_start, $lei); +} + +sub ipc_atfork_child { + my ($self) = @_; + my $lei = $self->{lei}; + $lei->lei_atfork_child; + my $l2m = delete $lei->{l2m}; + $l2m->pre_augment($lei); + $l2m->do_augment($lei); + $l2m->post_augment($lei); + $self->{wcb} = $l2m->write_cb($lei); + $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); + $self->SUPER::ipc_atfork_child; +} + +1; diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm index 2114c0e8..5fec9384 100644 --- a/lib/PublicInbox/LeiDedupe.pm +++ b/lib/PublicInbox/LeiDedupe.pm @@ -127,7 +127,7 @@ sub prepare_dedupe { sub pause_dedupe { my ($self) = @_; - my $skv = $self->[0]; + my $skv = $self->[0] or return; $skv->dbh_release; delete($skv->{dbh}) if $skv; } diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index c820f0d7..3169bae6 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -51,18 +51,19 @@ sub detect_fmt ($$) { } sub new { - my ($class, $lei) = @_; + my ($class, $lei, $ofmt_key) = @_; my $opt = $lei->{opt}; my $dst = $opt->{output} // '-'; $dst = '/dev/stdout' if $dst eq '-'; + $ofmt_key //= 'format'; - my $fmt = $opt->{'format'}; + my $fmt = $opt->{$ofmt_key}; $fmt = lc($fmt) if defined $fmt; if ($dst =~ s/\A([a-z0-9]+)://is) { # e.g. Maildir:/home/user/Mail/ my $ofmt = lc $1; $fmt //= $ofmt; return $lei->fail(<<"") if $fmt ne $ofmt; ---format=$fmt and --output=$ofmt conflict +--$ofmt_key=$fmt and --output=$ofmt conflict } $fmt //= 'json' if $dst eq '/dev/stdout'; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index e3e512be..f0adc44f 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -437,7 +437,7 @@ sub _do_augment_mbox { $dedupe->pause_dedupe if $dedupe; } -sub pre_augment { # fast (1 disk seek), runs in main daemon +sub pre_augment { # fast (1 disk seek), runs in same process as post_augment my ($self, $lei) = @_; # _pre_augment_maildir, _pre_augment_mbox my $m = "_pre_augment_$self->{base_type}"; @@ -451,7 +451,8 @@ sub do_augment { # slow, runs in wq worker $self->$m($lei); } -sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon +# fast (spawn compressor or mkdir), runs in same process as pre_augment +sub post_augment { my ($self, $lei, @args) = @_; # _post_augment_maildir, _post_augment_mbox my $m = "_post_augment_$self->{base_type}"; diff --git a/lib/PublicInbox/MdirReader.pm b/lib/PublicInbox/MdirReader.pm index e0ff676d..5fa534f5 100644 --- a/lib/PublicInbox/MdirReader.pm +++ b/lib/PublicInbox/MdirReader.pm @@ -7,6 +7,7 @@ package PublicInbox::MdirReader; use strict; use v5.10.1; +use PublicInbox::InboxWritable qw(eml_from_path); # returns Maildir flags from a basename ('' for no flags, undef for invalid) sub maildir_basename_flags { @@ -36,4 +37,29 @@ sub maildir_each_file ($$;@) { } } +my %c2kw = ('D' => 'draft', F => 'flagged', R => 'answered', S => 'seen'); + +sub maildir_each_eml ($$;@) { + my ($dir, $cb, @arg) = @_; + $dir .= '/' unless substr($dir, -1) eq '/'; + my $pfx = "$dir/new/"; + if (opendir(my $dh, $pfx)) { + while (defined(my $bn = readdir($dh))) { + next if substr($bn, 0, 1) eq '.'; + my @f = split(/:/, $bn, -1); + next if scalar(@f) != 1; + my $eml = eml_from_path($pfx.$bn) or next; + $cb->([], $eml, @arg); + } + } + $pfx = "$dir/cur/"; + opendir my $dh, $pfx or return; + while (defined(my $bn = readdir($dh))) { + my $fl = maildir_basename_flags($bn) // next; + my $eml = eml_from_path($pfx.$bn) or next; + my @kw = sort(map { $c2kw{$_} // () } split(//, $fl)); + $cb->(\@kw, $eml, @arg); + } +} + 1; diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm index 1d053425..ad8c18d0 100644 --- a/lib/PublicInbox/NetReader.pm +++ b/lib/PublicInbox/NetReader.pm @@ -5,7 +5,8 @@ package PublicInbox::NetReader; use strict; use v5.10.1; -use parent qw(Exporter); +use parent qw(Exporter PublicInbox::IPC); +use PublicInbox::Eml; # TODO: trim this down, this is huge our @EXPORT = qw(uri_new uri_scheme uri_section @@ -33,7 +34,7 @@ sub uri_section ($) { sub auth_anon_cb { '' }; # for Mail::IMAPClient::Authcallback sub mic_for { # mic = Mail::IMAPClient - my ($self, $url, $mic_args) = @_; + my ($self, $url, $mic_args, $lei) = @_; require PublicInbox::URIimap; my $uri = PublicInbox::URIimap->new($url); require PublicInbox::GitCredential; @@ -74,21 +75,26 @@ sub mic_for { # mic = Mail::IMAPClient } if ($cred) { $cred->check_netrc unless defined $cred->{password}; - $cred->fill; # may prompt user here + $cred->fill($lei); # may prompt user here $mic->User($mic_arg->{User} = $cred->{username}); $mic->Password($mic_arg->{Password} = $cred->{password}); } else { # AUTH=ANONYMOUS $mic->Authmechanism($mic_arg->{Authmechanism} = 'ANONYMOUS'); - $mic->Authcallback($mic_arg->{Authcallback} = \&auth_anon_cb); + $mic_arg->{Authcallback} = 'auth_anon_cb'; + $mic->Authcallback(\&auth_anon_cb); } + my $err; if ($mic->login && $mic->IsAuthenticated) { # success! keep IMAPClient->new arg in case we get disconnected $self->{mic_arg}->{uri_section($uri)} = $mic_arg; } else { - warn "E: <$url> LOGIN: $@\n"; + $err = "E: <$url> LOGIN: $@\n"; $mic = undef; } $cred->run($mic ? 'approve' : 'reject') if $cred; + if ($err) { + $lei ? $lei->fail($err) : warn($err); + } $mic; } @@ -139,8 +145,8 @@ E: <$url> STARTTLS requested and failed $nn; } -sub nn_for ($$$) { # nn = Net::NNTP - my ($self, $url, $nn_args) = @_; +sub nn_for ($$$;$) { # nn = Net::NNTP + my ($self, $url, $nn_args, $lei) = @_; my $uri = uri_new($url); my $sec = uri_section($uri); my $nntp_opt = $self->{nntp_opt}->{$sec} //= {}; @@ -170,7 +176,7 @@ sub nn_for ($$$) { # nn = Net::NNTP my $nn = nn_new($nn_arg, $nntp_opt, $url); if ($cred) { - $cred->fill; # may prompt user here + $cred->fill($lei); # may prompt user here if ($nn->authinfo($u, $p)) { push @{$nntp_opt->{-postconn}}, [ 'authinfo', $u, $p ]; } else { @@ -240,14 +246,15 @@ sub cfg_bool ($$$) { } # flesh out common IMAP-specific data structures -sub imap_common_init ($) { - my ($self) = @_; +sub imap_common_init ($;$) { + my ($self, $lei) = @_; + $self->{quiet} = 1 if $lei && $lei->{opt}->{quiet}; eval { require PublicInbox::IMAPClient } or die "Mail::IMAPClient is required for IMAP:\n$@\n"; eval { require PublicInbox::IMAPTracker } or die "DBD::SQLite is required for IMAP\n:$@\n"; require PublicInbox::URIimap; - my $cfg = $self->{pi_cfg}; + my $cfg = $self->{pi_cfg} // $lei->_lei_cfg; my $mic_args = {}; # scheme://authority => Mail:IMAPClient arg for my $url (@{$self->{imap_order}}) { my $uri = PublicInbox::URIimap->new($url); @@ -275,7 +282,8 @@ sub imap_common_init ($) { my $mics = {}; # schema://authority => IMAPClient obj for my $url (@{$self->{imap_order}}) { my $uri = PublicInbox::URIimap->new($url); - $mics->{uri_section($uri)} //= mic_for($self, $url, $mic_args); + my $sec = uri_section($uri); + $mics->{$sec} //= mic_for($self, $url, $mic_args, $lei); } $mics; } @@ -294,9 +302,140 @@ sub errors { if (my $u = $self->{unsupported_url}) { return "Unsupported URL(s): @$u"; } + if ($self->{imap_order}) { + eval { require PublicInbox::IMAPClient } or + die "Mail::IMAPClient is required for IMAP:\n$@\n"; + } undef; } +my %IMAPflags2kw = ( + '\Seen' => 'seen', + '\Answered' => 'answered', + '\Flagged' => 'flagged', + '\Draft' => 'draft', +); + +sub _imap_do_msg ($$$$$) { + my ($self, $url, $uid, $raw, $flags) = @_; + # our target audience expects LF-only, save storage + $$raw =~ s/\r\n/\n/sg; + my $kw = []; + for my $f (split(/ /, $flags)) { + my $k = $IMAPflags2kw{$f} // next; # TODO: X-Label? + push @$kw, $k; + } + my ($eml_cb, @args) = @{$self->{eml_each}}; + $eml_cb->($url, $uid, $kw, PublicInbox::Eml->new($raw), @args); +} + +sub _imap_fetch_all ($$$) { + my ($self, $mic, $url) = @_; + my $uri = PublicInbox::URIimap->new($url); + my $sec = uri_section($uri); + my $mbx = $uri->mailbox; + $mic->Clear(1); # trim results history + $mic->examine($mbx) or return "E: EXAMINE $mbx ($sec) failed: $!"; + my ($r_uidval, $r_uidnext); + for ($mic->Results) { + /^\* OK \[UIDVALIDITY ([0-9]+)\].*/ and $r_uidval = $1; + /^\* OK \[UIDNEXT ([0-9]+)\].*/ and $r_uidnext = $1; + last if $r_uidval && $r_uidnext; + } + $r_uidval //= $mic->uidvalidity($mbx) // + return "E: $url cannot get UIDVALIDITY"; + $r_uidnext //= $mic->uidnext($mbx) // + return "E: $url cannot get UIDNEXT"; + my $itrk = $self->{incremental} ? + PublicInbox::IMAPTracker->new($url) : 0; + my ($l_uidval, $l_uid) = $itrk ? $itrk->get_last : (); + $l_uidval //= $r_uidval; # first time + $l_uid //= 1; + if ($l_uidval != $r_uidval) { + return "E: $url UIDVALIDITY mismatch\n". + "E: local=$l_uidval != remote=$r_uidval"; + } + my $r_uid = $r_uidnext - 1; + if ($l_uid != 1 && $l_uid > $r_uid) { + return "E: $url local UID exceeds remote ($l_uid > $r_uid)\n". + "E: $url strangely, UIDVALIDLITY matches ($l_uidval)\n"; + } + return if $l_uid >= $r_uid; # nothing to do + + warn "# $url fetching UID $l_uid:$r_uid\n" unless $self->{quiet}; + $mic->Uid(1); # the default, we hope + my $bs = $self->{imap_opt}->{$sec}->{batch_size} // 1; + my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK'; + my $key = $req; + $key =~ s/\.PEEK//; + my ($uids, $batch); + my $err; + do { + # I wish "UID FETCH $START:*" could work, but: + # 1) servers do not need to return results in any order + # 2) Mail::IMAPClient doesn't offer a streaming API + $uids = $mic->search("UID $l_uid:*") or + return "E: $url UID SEARCH $l_uid:* error: $!"; + return if scalar(@$uids) == 0; + + # RFC 3501 doesn't seem to indicate order of UID SEARCH + # responses, so sort it ourselves. Order matters so + # IMAPTracker can store the newest UID. + @$uids = sort { $a <=> $b } @$uids; + + # Did we actually get new messages? + return if $uids->[0] < $l_uid; + + $l_uid = $uids->[-1] + 1; # for next search + my $last_uid; + my $n = $self->{max_batch}; + while (scalar @$uids) { + my @batch = splice(@$uids, 0, $bs); + $batch = join(',', @batch); + local $0 = "UID:$batch $mbx $sec"; + my $r = $mic->fetch_hash($batch, $req, 'FLAGS'); + unless ($r) { # network error? + $err = "E: $url UID FETCH $batch error: $!"; + last; + } + for my $uid (@batch) { + # messages get deleted, so holes appear + my $per_uid = delete $r->{$uid} // next; + my $raw = delete($per_uid->{$key}) // next; + _imap_do_msg($self, $url, $uid, \$raw, + $per_uid->{FLAGS}); + $last_uid = $uid; + last if $self->{quit}; + } + last if $self->{quit}; + } + $itrk->update_last($r_uidval, $last_uid) if $itrk; + } until ($err || $self->{quit}); + $err; +} + +sub imap_each { + my ($self, $url, $eml_cb, @args) = @_; + my $uri = PublicInbox::URIimap->new($url); + my $sec = uri_section($uri); + my $mic_arg = $self->{mic_arg}->{$sec} or + die "BUG: no Mail::IMAPClient->new arg for $sec"; + local $0 = $uri->mailbox." $sec"; + my $cb_name = $mic_arg->{Authcallback}; + if (ref($cb_name) ne 'CODE') { + $mic_arg->{Authcallback} = $self->can($cb_name); + } + my $mic = PublicInbox::IMAPClient->new(%$mic_arg, Debug => 0); + my $err; + if ($mic && $mic->IsConnected) { + local $self->{eml_each} = [ $eml_cb, @args ]; + $err = _imap_fetch_all($self, $mic, $url); + } else { + $err = "E: not connected: $!"; + } + $mic; +} + sub new { bless {}, shift }; 1; diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm index c5070cfd..3eb08e9f 100644 --- a/lib/PublicInbox/TestCommon.pm +++ b/lib/PublicInbox/TestCommon.pm @@ -462,10 +462,15 @@ our $lei = sub { sub lei (@) { $lei->(@_) } sub lei_ok (@) { - my $msg = ref($_[-1]) ? pop(@_) : undef; + my $msg = ref($_[-1]) eq 'SCALAR' ? pop(@_) : undef; + my $tmpdir = quotemeta(File::Spec->tmpdir); # filter out anything that looks like a path name for consistent logs - my @msg = grep(!m!\A/!, @_); - ok($lei->(@_), "lei @msg". ($msg ? " ($$msg)" : '')); + my @msg = ref($_[0]) eq 'ARRAY' ? @{$_[0]} : @_; + for (@msg) { + s!\A([a-z0-9]+://)[^/]+/!$1\$HOST_PORT/! || + s!$tmpdir\b/(?:[^/]+/)?!\$TMPDIR/!; + } + ok(lei(@_), "lei @msg". ($msg ? " ($$msg)" : '')) or diag $lei_err; } sub json_utf8 () { diff --git a/t/lei-convert.t b/t/lei-convert.t new file mode 100644 index 00000000..f58a0a80 --- /dev/null +++ b/t/lei-convert.t @@ -0,0 +1,71 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; use v5.10.1; use PublicInbox::TestCommon; +use PublicInbox::MboxReader; +use PublicInbox::MdirReader; +use PublicInbox::NetReader; +require_git 2.6; +require_mods(qw(DBD::SQLite Search::Xapian)); +my ($tmpdir, $for_destroy) = tmpdir; +my $sock = tcp_server; +my $cmd = [ '-imapd', '-W0', "--stdout=$tmpdir/1", "--stderr=$tmpdir/2" ]; +my ($ro_home, $cfg_path) = setup_public_inboxes; +my $env = { PI_CONFIG => $cfg_path }; +my $td = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT("-imapd: $?"); +my $host_port = tcp_host_port($sock); +undef $sock; +test_lei({ tmpdir => $tmpdir }, sub { + my $d = $ENV{HOME}; + my $dig = Digest::SHA->new(256); + lei_ok('convert', '-o', "mboxrd:$d/foo.mboxrd", + "imap://$host_port/t.v2.0"); + ok(-f "$d/foo.mboxrd", 'mboxrd created'); + my (@mboxrd, @mboxcl2); + open my $fh, '<', "$d/foo.mboxrd" or BAIL_OUT $!; + PublicInbox::MboxReader->mboxrd($fh, sub { push @mboxrd, shift }); + ok(scalar(@mboxrd) > 1, 'got multiple messages'); + + lei_ok('convert', '-o', "mboxcl2:$d/cl2", "mboxrd:$d/foo.mboxrd"); + ok(-s "$d/cl2", 'mboxcl2 non-empty') or diag $lei_err; + open $fh, '<', "$d/cl2" or BAIL_OUT $!; + PublicInbox::MboxReader->mboxcl2($fh, sub { + my $eml = shift; + $eml->header_set($_) for (qw(Content-Length Lines)); + push @mboxcl2, $eml; + }); + is_deeply(\@mboxcl2, \@mboxrd, 'mboxrd and mboxcl2 have same mail'); + + lei_ok('convert', '-o', "$d/md", "mboxrd:$d/foo.mboxrd"); + ok(-d "$d/md", 'Maildir created'); + my @md; + PublicInbox::MdirReader::maildir_each_eml("$d/md", sub { + push @md, $_[1]; + }); + is(scalar(@md), scalar(@mboxrd), 'got expected emails in Maildir'); + @md = sort { ${$a->{bdy}} cmp ${$b->{bdy}} } @md; + @mboxrd = sort { ${$a->{bdy}} cmp ${$b->{bdy}} } @mboxrd; + my @rd_nostatus = map { + my $eml = PublicInbox::Eml->new(\($_->as_string)); + $eml->header_set('Status'); + $eml; + } @mboxrd; + is_deeply(\@md, \@rd_nostatus, 'Maildir output matches mboxrd'); + + my @bar; + lei_ok('convert', '-o', "mboxrd:$d/bar.mboxrd", "$d/md"); + open $fh, '<', "$d/bar.mboxrd" or BAIL_OUT $!; + PublicInbox::MboxReader->mboxrd($fh, sub { push @bar, shift }); + @bar = sort { ${$a->{bdy}} cmp ${$b->{bdy}} } @bar; + is_deeply(\@mboxrd, \@bar, + 'mboxrd round-tripped through Maildir w/ flags'); + + open my $in, '<', "$d/foo.mboxrd" or BAIL_OUT; + my $rdr = { 0 => $in, 1 => \(my $out), 2 => \$lei_err }; + lei_ok([qw(convert --stdin -F mboxrd -o mboxrd:/dev/stdout)], + undef, $rdr); + open $fh, '<', "$d/foo.mboxrd" or BAIL_OUT; + my $exp = do { local $/; <$fh> }; + is($out, $exp, 'stdin => stdout'); +}); +done_testing; diff --git a/t/net_reader-imap.t b/t/net_reader-imap.t new file mode 100644 index 00000000..eea8b0fd --- /dev/null +++ b/t/net_reader-imap.t @@ -0,0 +1,40 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; use v5.10.1; use PublicInbox::TestCommon; +require_git 2.6; +require_mods(qw(DBD::SQLite Search::Xapian)); +my ($tmpdir, $for_destroy) = tmpdir; +my ($ro_home, $cfg_path) = setup_public_inboxes; +my $cmd = [ '-imapd', '-W0', "--stdout=$tmpdir/1", "--stderr=$tmpdir/2" ]; +my $sock = tcp_server; +my $env = { PI_CONFIG => $cfg_path }; +my $td = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT "-imapd: $?"; +my ($host, $port) = tcp_host_port $sock; +require_ok 'PublicInbox::NetReader'; +my $nrd = PublicInbox::NetReader->new; +$nrd->add_url(my $url = "imap://$host:$port/t.v2.0"); +is($nrd->errors, undef, 'no errors'); +$nrd->{pi_cfg} = PublicInbox::Config->new($cfg_path); +$nrd->imap_common_init; +$nrd->{quiet} = 1; +my (%eml, %urls, %args, $nr, @w); +local $SIG{__WARN__} = sub { push(@w, @_) }; +$nrd->imap_each($url, sub { + my ($u, $uid, $kw, $eml, $arg) = @_; + ++$urls{$u}; + ++$args{$arg}; + like($uid, qr/\A[0-9]+\z/, 'got digit UID '.$uid); + ++$eml{ref($eml)}; + ++$nr; +}, 'blah'); +is(scalar(@w), 0, 'no warnings'); +ok($nr, 'got some emails'); +is($eml{'PublicInbox::Eml'}, $nr, 'got expected Eml objects'); +is(scalar keys %eml, 1, 'only got Eml objects'); +is($urls{$url}, $nr, 'one URL expected number of times'); +is(scalar keys %urls, 1, 'only got one URL'); +is($args{blah}, $nr, 'got arg expected number of times'); +is(scalar keys %args, 1, 'only got one arg'); + +done_testing;
This makes "lei import" more similar to "lei convert" and allows importing from disparate sources simultaneously. We'll also fix some ->child_error usage errors and make the style of the code more similar to the "lei convert" code. v2: fix missing requires --- MANIFEST | 1 + lib/PublicInbox/LeiImport.pm | 129 ++++++++++++++++++++++++----------- t/lei-import-imap.t | 28 ++++++++ t/lei-import-maildir.t | 4 +- t/lei_to_mail.t | 10 +++ 5 files changed, 130 insertions(+), 42 deletions(-) create mode 100644 t/lei-import-imap.t diff --git a/MANIFEST b/MANIFEST index 4f146771..19f73356 100644 --- a/MANIFEST +++ b/MANIFEST @@ -365,6 +365,7 @@ t/kqnotify.t t/lei-convert.t t/lei-daemon.t t/lei-externals.t +t/lei-import-imap.t t/lei-import-maildir.t t/lei-import.t t/lei-mirror.t diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 32f3a467..62a2a412 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -29,7 +29,7 @@ sub import_done { # EOF callback for main daemon $imp->wq_wait_old(\&import_done_wait, $lei); } -sub do_import { +sub import_start { my ($lei) = @_; my $ops = { '!' => [ $lei->can('fail_handler'), $lei ], @@ -39,7 +39,7 @@ sub do_import { }; ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); my $self = $lei->{imp}; - my $j = $lei->{opt}->{jobs} // scalar(@{$self->{argv}}) || 1; + my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1; if (my $nrd = $lei->{nrd}) { # $j = $nrd->net_concurrency($j); TODO } else { @@ -50,8 +50,8 @@ sub do_import { my $op = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; $self->wq_io_do('import_stdin', []) if $self->{0}; - for my $x (@{$self->{argv}}) { - $self->wq_io_do('import_path_url', [], $x); + for my $input (@{$self->{inputs}}) { + $self->wq_io_do('import_path_url', [], $input); } $self->wq_close(1); $lei->event_step_init; # wait for shutdowns @@ -61,60 +61,91 @@ sub do_import { } sub call { # the main "lei import" method - my ($cls, $lei, @argv) = @_; + my ($cls, $lei, @inputs) = @_; my $sto = $lei->_lei_store(1); $sto->write_prepare($lei); + my ($nrd, @f, @d); $lei->{opt}->{kw} //= 1; - my $self = $lei->{imp} = bless { argv => \@argv }, $cls; + my $self = $lei->{imp} = bless { inputs => \@inputs }, $cls; if ($lei->{opt}->{stdin}) { - @argv and return - $lei->fail("--stdin and locations (@argv) do not mix"); + @inputs and return $lei->fail("--stdin and @inputs do not mix"); $lei->check_input_format or return; $self->{0} = $lei->{0}; - } else { - my @f; - for my $x (@argv) { - if (-f $x) { push @f, $x } - elsif (-d _) { require PublicInbox::MdirReader } - else { - require PublicInbox::NetReader; - $lei->{nrd} //= PublicInbox::NetReader->new; - $lei->{nrd}->add_url($x); + } + + # TODO: do we need --format for non-stdin? + my $fmt = $lei->{opt}->{'format'}; + # e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX + for my $input (@inputs) { + my $input_path = $input; + if ($input =~ m!\A(?:imap|nntp)s?://!i) { + require PublicInbox::NetReader; + $nrd //= PublicInbox::NetReader->new; + $nrd->add_url($input); + } elsif ($input_path =~ s/\A([a-z0-9]+)://is) { + my $ifmt = lc $1; + if (($fmt // $ifmt) ne $ifmt) { + return $lei->fail(<<""); +--format=$fmt and `$ifmt:' conflict + } - } - if (@f) { $lei->check_input_format(\@f) or return } - if ($lei->{nrd} && (my @err = $lei->{nrd}->errors)) { - return $lei->fail(@err); - } + if (-f $input_path) { + require PublicInbox::MboxReader; + PublicInbox::MboxReader->can($ifmt) or return + $lei->fail("$ifmt not supported"); + } elsif (-d _) { + require PublicInbox::MdirReader; + $ifmt eq 'maildir' or return + $lei->fail("$ifmt not supported"); + } else { + return $lei->fail("Unable to handle $input"); + } + } elsif (-f $input) { push @f, $input + } elsif (-d _) { push @d, $input + } else { return $lei->fail("Unable to handle $input") } } - do_import($lei); + if (@f) { $lei->check_input_format(\@f) or return } + if (@d) { # TODO: check for MH vs Maildir, here + require PublicInbox::MdirReader; + } + $self->{inputs} = \@inputs; + return import_start($lei) if !$nrd; + + if (my $err = $nrd->errors) { + return $lei->fail($err); + } + $nrd->{quiet} = $lei->{opt}->{quiet}; + $lei->{nrd} = $nrd; + require PublicInbox::LeiAuth; + my $auth = $lei->{auth} = PublicInbox::LeiAuth->new($nrd); + $auth->auth_start($lei, \&import_start, $lei); } sub ipc_atfork_child { my ($self) = @_; + delete $self->{lei}->{imp}; # drop circular ref $self->{lei}->lei_atfork_child; $self->SUPER::ipc_atfork_child; } sub _import_fh { - my ($lei, $fh, $x) = @_; + my ($lei, $fh, $input, $ifmt) = @_; my $set_kw = $lei->{opt}->{kw}; - my $fmt = $lei->{opt}->{'format'}; eval { - if ($fmt eq 'eml') { + if ($ifmt eq 'eml') { my $buf = do { local $/; <$fh> } // - return $lei->child_error(1 >> 8, <<""); -error reading $x: $! + return $lei->child_error(1 << 8, <<""); +error reading $input: $! my $eml = PublicInbox::Eml->new(\$buf); _import_eml($eml, $lei->{sto}, $set_kw); } else { # some mbox (->can already checked in call); - my $cb = PublicInbox::MboxReader->can($fmt) // - die "BUG: bad fmt=$fmt"; + my $cb = PublicInbox::MboxReader->can($ifmt) // + die "BUG: bad fmt=$ifmt"; $cb->(undef, $fh, \&_import_eml, $lei->{sto}, $set_kw); } }; - $lei->child_error(1 >> 8, "<stdin>: $@") if $@; + $lei->child_error(1 << 8, "<stdin>: $@") if $@; } sub _import_maildir { # maildir_each_file cb @@ -122,27 +153,45 @@ sub _import_maildir { # maildir_each_file cb $sto->ipc_do('set_eml_from_maildir', $f, $set_kw); } +sub _import_imap { # imap_each cb + my ($url, $uid, $kw, $eml, $sto, $set_kw) = @_; + warn "$url $uid"; + $sto->ipc_do('set_eml', $eml, $set_kw ? @$kw : ()); +} + sub import_path_url { - my ($self, $x) = @_; + my ($self, $input) = @_; my $lei = $self->{lei}; + my $ifmt = lc($lei->{opt}->{'format'} // ''); # TODO auto-detect? - if (-f $x) { - open my $fh, '<', $x or return $lei->child_error(1 >> 8, <<""); -unable to open $x: $! + if ($input =~ m!\A(imap|nntp)s?://!i) { + $lei->{nrd}->imap_each($input, \&_import_imap, $lei->{sto}, + $lei->{opt}->{kw}); + return; + } elsif ($input =~ s!\A([a-z0-9]+):!!i) { + $ifmt = lc $1; + } + if (-f $input) { + open my $fh, '<', $input or return $lei->child_error(1 << 8, <<""); +unable to open $input: $! - _import_fh($lei, $fh, $x); - } elsif (-d _ && (-d "$x/cur" || -d "$x/new")) { - PublicInbox::MdirReader::maildir_each_file($x, + _import_fh($lei, $fh, $input, $ifmt); + } elsif (-d _ && (-d "$input/cur" || -d "$input/new")) { + return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir'; +$input appears to a be a maildir, not $ifmt +EOM + PublicInbox::MdirReader::maildir_each_file($input, \&_import_maildir, $lei->{sto}, $lei->{opt}->{kw}); } else { - $lei->fail("$x unsupported (TODO)"); + $lei->fail("$input unsupported (TODO)"); } } sub import_stdin { my ($self) = @_; - _import_fh($self->{lei}, $self->{0}, '<stdin>'); + my $lei = $self->{lei}; + _import_fh($lei, delete $self->{0}, '<stdin>', $lei->{opt}->{'format'}); } 1; diff --git a/t/lei-import-imap.t b/t/lei-import-imap.t new file mode 100644 index 00000000..ee308723 --- /dev/null +++ b/t/lei-import-imap.t @@ -0,0 +1,28 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; use v5.10.1; use PublicInbox::TestCommon; +require_git 2.6; +require_mods(qw(DBD::SQLite Search::Xapian)); +my ($ro_home, $cfg_path) = setup_public_inboxes; +my ($tmpdir, $for_destroy) = tmpdir; +my $sock = tcp_server; +my $cmd = [ '-imapd', '-W0', "--stdout=$tmpdir/1", "--stderr=$tmpdir/2" ]; +my $env = { PI_CONFIG => $cfg_path }; +my $td = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT("-imapd: $?"); +my $host_port = tcp_host_port($sock); +undef $sock; +test_lei({ tmpdir => $tmpdir }, sub { + lei_ok(qw(q bytes:1..)); + my $out = json_utf8->decode($lei_out); + is_deeply($out, [ undef ], 'nothing imported, yet'); + lei_ok('import', "imap://$host_port/t.v2.0"); + lei_ok(qw(q bytes:1..)); + $out = json_utf8->decode($lei_out); + ok(scalar(@$out) > 1, 'got imported messages'); + is(pop @$out, undef, 'trailing JSON null element was null'); + my %r; + for (@$out) { $r{ref($_)}++ } + is_deeply(\%r, { 'HASH' => scalar(@$out) }, 'all hashes'); +}); +done_testing; diff --git a/t/lei-import-maildir.t b/t/lei-import-maildir.t index 5842e19e..d2b059ad 100644 --- a/t/lei-import-maildir.t +++ b/t/lei-import-maildir.t @@ -23,8 +23,8 @@ test_lei(sub { is_deeply($r2, $res, 'idempotent import'); rename("$md/cur/x:2,S", "$md/cur/x:2,SR") or BAIL_OUT "rename: $!"; - ok($lei->(qw(import), $md), 'import Maildir after +answered'); - ok($lei->(qw(q -d none s:boolean)), 'lei q after +answered'); + lei_ok('import', "maildir:$md", \'import Maildir after +answered'); + lei_ok(qw(q -d none s:boolean), \'lei q after +answered'); $res = json_utf8->decode($lei_out); like($res->[0]->{'s'}, qr/use boolean/, 'got expected result'); is_deeply($res->[0]->{kw}, ['answered', 'seen'], 'keywords set'); diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t index 6a571660..72b90700 100644 --- a/t/lei_to_mail.t +++ b/t/lei_to_mail.t @@ -139,6 +139,16 @@ test_lei(sub { is($res->[1], undef, 'only one result'); }); +test_lei(sub { + lei_ok('import', "$mbox:$fn", \'imported mbox:/path') or diag $lei_err; + lei_ok(qw(q s:x), \'lei q works') or diag $lei_err; + my $res = json_utf8->decode($lei_out); + my $x = $res->[0]; + is($x->{'s'}, 'x', 'subject imported') or diag $lei_out; + is_deeply($x->{'kw'}, ['seen'], 'kw imported') or diag $lei_out; + is($res->[1], undef, 'only one result'); +}); + for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? my $zsfx2cmd = PublicInbox::LeiToMail->can('zsfx2cmd'); SKIP: {
The backends for "lei add-external --mirror", "lei convert", and "lei import" all share a similar pattern for spawning background workers. Hoist out the common parts to slim down our code base a bit. The LeiXSearch and LeiToMail workers for "lei q" remains a the odd duck due to the deep pipelining and parallelization. --- lib/PublicInbox/LEI.pm | 19 +++++++++++++++++++ lib/PublicInbox/LeiAuth.pm | 17 +++-------------- lib/PublicInbox/LeiConvert.pm | 22 +++++----------------- lib/PublicInbox/LeiImport.pm | 19 ++++--------------- lib/PublicInbox/LeiMirror.pm | 19 ++++--------------- 5 files changed, 35 insertions(+), 61 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 1e4c36d0..0b4bc20e 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -468,6 +468,25 @@ sub lei_atfork_child { $current_lei = $persist ? undef : $self; # for SIG{__WARN__} } +sub workers_start { + my ($lei, $wq, $ident, $jobs, $ops) = @_; + $ops = { + '!' => [ $lei->can('fail_handler'), $lei ], + '|' => [ $lei->can('sigpipe_handler'), $lei ], + 'x_it' => [ $lei->can('x_it'), $lei ], + 'child_error' => [ $lei->can('child_error'), $lei ], + %$ops + }; + require PublicInbox::PktOp; + ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); + $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei }); + delete $lei->{pkt_op_p}; + my $op = delete $lei->{pkt_op_c}; + $lei->event_step_init; + # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op + $lei->{oneshot} ? $op : undef; +} + sub _help { require PublicInbox::LeiHelp; PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC); diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm index 6593ba51..7acb9900 100644 --- a/lib/PublicInbox/LeiAuth.pm +++ b/lib/PublicInbox/LeiAuth.pm @@ -43,24 +43,13 @@ sub auth_eof { sub auth_start { my ($self, $lei, $post_auth_cb, @args) = @_; $lei->_lei_cfg(1); # workers may need to read config - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - '|' => [ $lei->can('sigpipe_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], + my $op = $lei->workers_start($self, 'auth', 1, { 'nrd_merge' => [ \&nrd_merge, $lei ], '' => [ \&auth_eof, $lei, $post_auth_cb, @args ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); - $self->wq_workers_start('lei_auth', 1, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + }); $self->wq_io_do('do_auth', []); $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm index 78fd5e17..ba375772 100644 --- a/lib/PublicInbox/LeiConvert.pm +++ b/lib/PublicInbox/LeiConvert.pm @@ -8,7 +8,6 @@ use v5.10.1; use parent qw(PublicInbox::IPC); use PublicInbox::Eml; use PublicInbox::InboxWritable qw(eml_from_path); -use PublicInbox::PktOp; use PublicInbox::LeiStore; use PublicInbox::LeiOverview; @@ -59,26 +58,15 @@ sub do_convert { # via wq_do delete $self->{wcb}; # commit } -sub convert_start { +sub convert_start { # LeiAuth->auth_start callback my ($lei) = @_; - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - '|' => [ $lei->can('sigpipe_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], - '' => [ $lei->can('dclose'), $lei ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); my $self = $lei->{cnv}; - $self->wq_workers_start('lei_convert', 1, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + my $op = $lei->workers_start($self, 'lei_convert', 1, { + '' => [ $lei->can('dclose'), $lei ] + }); $self->wq_io_do('do_convert', []); $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub call { # the main "lei convert" method diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 62a2a412..68cab12c 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -8,7 +8,6 @@ use v5.10.1; use parent qw(PublicInbox::IPC); use PublicInbox::Eml; use PublicInbox::InboxWritable qw(eml_from_path); -use PublicInbox::PktOp; sub _import_eml { # MboxReader callback my ($eml, $sto, $set_kw) = @_; @@ -31,13 +30,6 @@ sub import_done { # EOF callback for main daemon sub import_start { my ($lei) = @_; - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], - '' => [ \&import_done, $lei ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); my $self = $lei->{imp}; my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1; if (my $nrd = $lei->{nrd}) { @@ -46,18 +38,15 @@ sub import_start { my $nproc = $self->detect_nproc; $j = $nproc if $j > $nproc; } - $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + my $op = $lei->workers_start($self, 'lei_import', $j, { + '' => [ \&import_done, $lei ], + }); $self->wq_io_do('import_stdin', []) if $self->{0}; for my $input (@{$self->{inputs}}) { $self->wq_io_do('import_path_url', [], $input); } $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub call { # the main "lei import" method diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index c5153148..f8ca1ee5 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -8,7 +8,6 @@ use v5.10.1; use parent qw(PublicInbox::IPC); use IO::Uncompress::Gunzip qw(gunzip $GunzipError); use PublicInbox::Spawn qw(popen_rd spawn); -use PublicInbox::PktOp; sub do_finish_mirror { # dwaitpid callback my ($arg, $pid) = @_; @@ -279,22 +278,12 @@ sub start { require PublicInbox::Inbox; require PublicInbox::Admin; require PublicInbox::InboxWritable; - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], - '' => [ \&mirror_done, $lei ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); - $self->wq_workers_start('lei_mirror', 1, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + my $op = $lei->workers_start($self, 'lei_mirror', 1, { + '' => [ \&mirror_done, $lei ] + }); $self->wq_io_do('do_mirror', []); $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub ipc_atfork_child {
We need to ensure authentication failures and error codes get propagated to the parent process(es) properly. v2: update MANIFEST v3: LeiAuth.pm ->_lei_cfg bit moved to a previous commit --- MANIFEST | 1 + lib/PublicInbox/NetReader.pm | 3 +++ xt/lei-auth-fail.t | 20 ++++++++++++++++++++ 3 files changed, 24 insertions(+) create mode 100644 xt/lei-auth-fail.t diff --git a/MANIFEST b/MANIFEST index 19f73356..3d9ad616 100644 --- a/MANIFEST +++ b/MANIFEST @@ -466,6 +466,7 @@ xt/git_async_cmp.t xt/httpd-async-stream.t xt/imapd-mbsync-oimap.t xt/imapd-validate.t +xt/lei-auth-fail.t xt/lei-sigpipe.t xt/mem-imapd-tls.t xt/mem-msgview.t diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm index ad8c18d0..61ea538b 100644 --- a/lib/PublicInbox/NetReader.pm +++ b/lib/PublicInbox/NetReader.pm @@ -89,6 +89,9 @@ sub mic_for { # mic = Mail::IMAPClient $self->{mic_arg}->{uri_section($uri)} = $mic_arg; } else { $err = "E: <$url> LOGIN: $@\n"; + if ($cred && defined($cred->{password})) { + $err =~ s/\Q$cred->{password}\E/*******/g; + } $mic = undef; } $cred->run($mic ? 'approve' : 'reject') if $cred; diff --git a/xt/lei-auth-fail.t b/xt/lei-auth-fail.t new file mode 100644 index 00000000..5308d0f9 --- /dev/null +++ b/xt/lei-auth-fail.t @@ -0,0 +1,20 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; use v5.10.1; use PublicInbox::TestCommon; + +# TODO: mock IMAP server which fails at authentication so we don't +# have to make external connections to test this: +my $imap_fail = $ENV{TEST_LEI_IMAP_FAIL_URL} // + 'imaps://AzureDiamond:Hunter2@public-inbox.org:994/INBOX'; +test_lei(sub { + ok(!lei(qw(convert -o mboxrd:/dev/stdout), $imap_fail), + 'IMAP auth failure on convert'); + like($lei_err, qr!\bE:.*?imaps://.*?!sm, 'error shown'); + unlike($lei_err, qr!Hunter2!s, 'password not shown'); + is($lei_out, '', 'nothing output'); + ok(!lei(qw(import), $imap_fail), 'IMAP auth failure on import'); + like($lei_err, qr!\bE:.*?imaps://.*?!sm, 'error shown'); + unlike($lei_err, qr!Hunter2!s, 'password not shown'); +}); +done_testing;