From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 88E7E1F9F3 for ; Sat, 29 May 2021 20:20:39 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 2/3] lei import|lcat: improve+fix single message IMAP support Date: Sat, 29 May 2021 20:20:38 +0000 Message-Id: <20210529202039.21412-3-e@80x24.org> In-Reply-To: <20210529202039.21412-1-e@80x24.org> References: <20210529202039.21412-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: lcat can now dump the memoized contents of entire IMAP folders, not just a single UID. It's now parallelized and pipelined for multiple lei2mail workers. Furthemore, various forms of JSON output work consistently with blob-only output, now. While working on this, I noticed NetReader was passing UID URLs to imap_each callbacks, which was causing mail_sync.sqlite3 to store UIDs in `folders' and clearly wrong so it's now fixed. --- lib/PublicInbox/LeiLcat.pm | 25 ++++++++------ lib/PublicInbox/LeiMailSync.pm | 23 ++++++++----- lib/PublicInbox/LeiToMail.pm | 6 ---- lib/PublicInbox/LeiXSearch.pm | 60 +++++++++++++++++++++++++++++++--- lib/PublicInbox/NetReader.pm | 6 ++-- t/lei-import-imap.t | 13 +++++++- 6 files changed, 101 insertions(+), 32 deletions(-) diff --git a/lib/PublicInbox/LeiLcat.pm b/lib/PublicInbox/LeiLcat.pm index f9d9633a..effc3682 100644 --- a/lib/PublicInbox/LeiLcat.pm +++ b/lib/PublicInbox/LeiLcat.pm @@ -11,14 +11,21 @@ use PublicInbox::LeiViewText; use URI::Escape qw(uri_unescape); use PublicInbox::MID qw($MID_EXTRACT); -sub lcat_imap_uid_uri ($$) { - my ($lei, $uid_uri) = @_; +sub lcat_imap_uri ($$) { + my ($lei, $uri) = @_; my $lms = $lei->{lse}->lms or return; - my $oidhex = $lms->imap_oid($lei, $uid_uri); - if (ref(my $err = $oidhex)) { # art2folder error - $lei->qerr(@{$err->{qerr}}) if $err->{qerr}; + # cf. LeiToMail->wq_atexit_child + if (defined $uri->uid) { + my $oidhex = $lms->imap_oid($lei, $uri); + if (ref(my $err = $oidhex)) { # art2folder error + $lei->qerr(@{$err->{qerr}}) if $err->{qerr}; + } + push @{$lei->{lcat_blob}}, $oidhex; + } elsif (defined(my $fid = $lms->fid_for($$uri))) { + push @{$lei->{lcat_fid}}, $fid; + } else { + $lei->child_error(1 << 8, "# unknown folder: $uri"); } - push @{$lei->{lcat_blob}}, $oidhex; # cf. LeiToMail->wq_atexit_child } sub extract_1 ($$) { @@ -26,10 +33,8 @@ sub extract_1 ($$) { if ($x =~ m!\b(imaps?://[^>]+)!i) { my $u = $1; require PublicInbox::URIimap; - $u = PublicInbox::URIimap->new($u); - defined($u->uid) ? lcat_imap_uid_uri($lei, $u) : - $lei->child_error(1 << 8, "# no UID= in $u"); - '""'; # blank query, using {lcat_blob} + lcat_imap_uri($lei, PublicInbox::URIimap->new($u)); + '""'; # blank query, using {lcat_blob} or {lcat } elsif ($x =~ m!\b([a-z]+?://\S+)!i) { my $u = $1; $u =~ s/[\>\]\)\,\.\;]+\z//; diff --git a/lib/PublicInbox/LeiMailSync.pm b/lib/PublicInbox/LeiMailSync.pm index 5c0988b5..c7f78239 100644 --- a/lib/PublicInbox/LeiMailSync.pm +++ b/lib/PublicInbox/LeiMailSync.pm @@ -64,9 +64,9 @@ CREATE TABLE IF NOT EXISTS blob2name ( } -sub _fid_for { +sub fid_for { my ($self, $folder, $rw) = @_; - my $dbh = $self->{dbh}; + my $dbh = $self->{dbh} //= dbh_new($self, $rw); my $sel = 'SELECT fid FROM folders WHERE loc = ? LIMIT 1'; my ($fid) = $dbh->selectrow_array($sel, undef, $folder); return $fid if defined $fid; @@ -111,7 +111,7 @@ EOM sub set_src { my ($self, $oidhex, $folder, $id) = @_; - my $fid = $self->{fmap}->{$folder} //= _fid_for($self, $folder, 1); + my $fid = $self->{fmap}->{$folder} //= fid_for($self, $folder, 1); my $sth; if (ref($id)) { # scalar name $id = $$id; @@ -128,7 +128,7 @@ INSERT OR IGNORE INTO blob2num (oidbin, fid, uid) VALUES (?, ?, ?) sub clear_src { my ($self, $folder, $id) = @_; - my $fid = $self->{fmap}->{$folder} //= _fid_for($self, $folder, 1); + my $fid = $self->{fmap}->{$folder} //= fid_for($self, $folder, 1); my $sth; if (ref($id)) { # scalar name $id = $$id; @@ -146,7 +146,7 @@ DELETE FROM blob2num WHERE fid = ? AND uid = ? # Maildir-only sub mv_src { my ($self, $folder, $oidbin, $id, $newbn) = @_; - my $fid = $self->{fmap}->{$folder} //= _fid_for($self, $folder, 1); + my $fid = $self->{fmap}->{$folder} //= fid_for($self, $folder, 1); my $sth = $self->{dbh}->prepare_cached(<<''); UPDATE blob2name SET name = ? WHERE fid = ? AND oidbin = ? AND name = ? @@ -158,7 +158,12 @@ sub each_src { my ($self, $folder, $cb, @args) = @_; my $dbh = $self->{dbh} //= dbh_new($self); my ($fid, $sth); - $fid = $self->{fmap}->{$folder} //= _fid_for($self, $folder) // return; + if (ref($folder) eq 'HASH') { + $fid = $folder->{fid} // die "BUG: no `fid'"; + } else { + $fid = $self->{fmap}->{$folder} //= + fid_for($self, $folder) // return; + } $sth = $dbh->prepare('SELECT oidbin,uid FROM blob2num WHERE fid = ?'); $sth->execute($fid); while (my ($oidbin, $id) = $sth->fetchrow_array) { @@ -176,7 +181,7 @@ sub location_stats { my $dbh = $self->{dbh} //= dbh_new($self); my $fid; my $ret = {}; - $fid = $self->{fmap}->{$folder} //= _fid_for($self, $folder) // return; + $fid = $self->{fmap}->{$folder} //= fid_for($self, $folder) // return; my ($row) = $dbh->selectrow_array(<<"", undef, $fid); SELECT COUNT(name) FROM blob2name WHERE fid = ? @@ -349,7 +354,7 @@ sub forget_folder { my ($self, $folder) = @_; my ($fid, $sth); $fid = delete($self->{fmap}->{$folder}) // - _fid_for($self, $folder) // return; + fid_for($self, $folder) // return; my $dbh = $self->{dbh}; $dbh->do('DELETE FROM blob2name WHERE fid = ?', undef, $fid); $dbh->do('DELETE FROM blob2num WHERE fid = ?', undef, $fid); @@ -369,7 +374,7 @@ sub imap_oid { $lei->qerr(@{$err->{qerr}}) if $err->{qerr}; } my $fid = $self->{fmap}->{$folders->[0]} //= - _fid_for($self, $folders->[0]) // return; + fid_for($self, $folders->[0]) // return; my $sth = $self->{dbh}->prepare_cached(<wq_io_do sub wq_atexit_child { my ($self) = @_; my $lei = $self->{lei}; - if (!$self->{-wq_worker_nr} && $lei->{lcat_blob}) { - for my $oid (@{$lei->{lcat_blob}}) { - my $smsg = { blob => $oid, pct => 100 }; - write_mail($self, $smsg); - } - } delete $self->{wcb}; $lei->{ale}->git->async_wait_all; my $nr = delete($lei->{-nr_write}) or return; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index d6d42a01..f7c1e559 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -256,6 +256,14 @@ sub query_combined_mset { # non-parallel for non-"--threads" users $lei->{ovv}->ovv_atexit_child($lei); } +sub _smsg_fill ($$) { + my ($smsg, $eml) = @_; + $smsg->populate($eml); + $smsg->parse_references($eml, mids($eml)); + $smsg->{$_} //= '' for qw(from to cc ds subject references mid); + delete @$smsg{qw(From Subject -ds -ts)}; +} + sub each_remote_eml { # callback for MboxReader->mboxrd my ($eml, $self, $lei, $each_smsg) = @_; my $xoids = $lei->{ale}->xoids_for($eml, 1); @@ -265,10 +273,7 @@ sub each_remote_eml { # callback for MboxReader->mboxrd my $smsg = bless {}, 'PublicInbox::Smsg'; $smsg->{blob} = $xoids ? (keys(%$xoids))[0] : git_sha(1, $eml)->hexdigest; - $smsg->populate($eml); - $smsg->parse_references($eml, mids($eml)); - $smsg->{$_} //= '' for qw(from to cc ds subject references mid); - delete @$smsg{qw(From Subject -ds -ts)}; + _smsg_fill($smsg, $eml); wait_startq($lei); if ($lei->{-progress}) { ++$lei->{-nr_remote_eml}; @@ -453,6 +458,9 @@ sub start_query ($;$) { # always runs in main (lei-daemon) process for my $uris (@$q) { $self->wq_io_do('query_remote_mboxrd', [], $uris); } + if ($self->{-do_lcat}) { + $self->wq_io_do('lcat_dump', []); + } $self->wq_close(1); # lei_xsearch workers stop when done } @@ -518,6 +526,7 @@ sub do_query { @$end = (); $self->{opt_threads} = $lei->{opt}->{threads}; $self->{opt_sort} = $lei->{opt}->{'sort'}; + $self->{-do_lcat} = $lei->{lcat_blob} // $lei->{lcat_fid}; if ($l2m) { $l2m->net_merge_all_done unless $lei->{auth}; } else { @@ -561,5 +570,48 @@ sub prepare_external { push @{$self->{locals}}, $loc; } +sub _lcat_i { # LeiMailSync->each_src iterator callback + my ($oidbin, $id, $each_smsg) = @_; + $each_smsg->({blob => unpack('H*', $oidbin), pct => 100}); +} + +sub _lcat2smsg { # git->cat_async callback + my ($bref, $oid, $type, $size, $smsg) = @_; + if ($bref) { + my $eml = PublicInbox::Eml->new($bref); + my $json_dump = delete $smsg->{-json_dump}; + bless $smsg, 'PublicInbox::Smsg'; + _smsg_fill($smsg, $eml); + $json_dump->($smsg, undef, $eml); + } +} + +sub lcat_dump { + my ($self) = @_; + my $lei = $self->{lei}; + my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); + my $git = $lei->{ale}->git; + if (!$lei->{l2m}) { + my $json_dump = $each_smsg; + $each_smsg = sub { + my ($smsg) = @_; + use Data::Dumper; + $smsg->{-json_dump} = $json_dump; + $git->cat_async($smsg->{blob}, \&_lcat2smsg, $smsg); + }; + } + for my $oid (@{$lei->{lcat_blob} // []}) { + $each_smsg->({ blob => $oid, pct => 100 }); + } + if (my $fids = delete $lei->{lcat_fid}) { + my $lms = $lei->{lse}->lms; + for my $fid (@$fids) { + $lms->each_src({fid => $fid}, \&_lcat_i, $each_smsg); + } + } + $git->async_wait_all; + undef $each_smsg; # may commit + $lei->{ovv}->ovv_atexit_child($lei); +} 1; diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm index 76d2fe62..54c6b082 100644 --- a/lib/PublicInbox/NetReader.pm +++ b/lib/PublicInbox/NetReader.pm @@ -471,8 +471,10 @@ EOF my $uri = $orig_uri->clone; my $single_uid = $uri->uid; my ($itrk, $l_uid, $l_uidval) = itrk_last($self, $uri, $r_uidval, $mic); - $itrk = $l_uid = undef if defined($single_uid); - + if (defined($single_uid)) { + $itrk = $l_uid = undef; + $uri->uid(undef); # for eml_cb + } return < $tmpdir }, sub { ok($stats->{'uid.min'} < $stats->{'uid.max'}, 'min < max'); ok($stats->{'uid.count'} > 0, 'count > 0'); + lei_ok('lcat', $url); + is(scalar(grep(/^# blob:/, split(/\n/ms, $lei_out))), + $stats->{'uid.count'}, 'lcat on URL dumps folder'); + lei_ok qw(lcat -f json), $url; + $out = json_utf8->decode($lei_out); + is(scalar(@$out) - 1, $stats->{'uid.count'}, 'lcat JSON dumps folder'); + lei_ok(qw(q z:1..)); $out = json_utf8->decode($lei_out); ok(scalar(@$out) > 1, 'got imported messages'); @@ -77,6 +84,8 @@ test_lei({ tmpdir => $tmpdir }, sub { unlike($lei_out, qr!\Q$host_port\E!, 'sync info gone after forget'); my $uid_url = "$url/;UID=".$stats->{'uid.max'}; lei_ok 'import', $uid_url; + lei_ok 'ls-mail-sync'; + is($lei_out, "$url\n", 'ls-mail-sync added URL w/o UID'); lei_ok 'inspect', $uid_url; $lei_out =~ /([a-f0-9]{40,})/ or xbail 'inspect missed blob with UID URL'; @@ -88,7 +97,9 @@ test_lei({ tmpdir => $tmpdir }, sub { my $orig = $lei_out; lei_ok 'lcat', "blob:$blob"; is($lei_out, $orig, 'lcat understands blob:...'); - ok(!lei('lcat', $url), "lcat doesn't work on IMAP URL w/o UID"); + lei_ok qw(lcat -f json), $uid_url; + $out = json_utf8->decode($lei_out); + is(scalar(@$out), 2, 'got JSON') or diag explain($out); }); done_testing;