From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 7B7761FB06 for ; Thu, 31 Dec 2020 13:51:55 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 09/36] lei_to_mail: start --augment, dedupe, bz2 and xz Date: Thu, 31 Dec 2020 13:51:27 +0000 Message-Id: <20201231135154.6070-10-e@80x24.org> In-Reply-To: <20201231135154.6070-1-e@80x24.org> References: <20201231135154.6070-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: --augment will match the mairix(1) option of the same name to augment existing search results. We'll need to implement deduplication for a better user experience. mutt ships with compressed mbox support for bz2 and xz, at least, so we'll support those out-of-the-box. --- lib/PublicInbox/LeiToMail.pm | 140 ++++++++++++++++++++++++++-------- lib/PublicInbox/Lock.pm | 7 ++ lib/PublicInbox/MboxReader.pm | 3 + t/lei_to_mail.t | 47 ++++++++---- 4 files changed, 150 insertions(+), 47 deletions(-) diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index ebb50c50..294291b2 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -8,10 +8,12 @@ use v5.10.1; use PublicInbox::Eml; use PublicInbox::Lock; use PublicInbox::ProcessPipe; -use PublicInbox::Spawn qw(which spawn); +use PublicInbox::SharedKV; +use PublicInbox::Spawn qw(which spawn popen_rd); +use PublicInbox::ContentHash qw(content_hash); use Symbol qw(gensym); -use File::Temp (); use IO::Handle; # ->autoflush +use Fcntl qw(SEEK_SET); my %kw2char = ( # Maildir characters draft => 'D', @@ -150,51 +152,123 @@ sub reap_compress { # dwaitpid callback $lei->fail("@$cmd failed", $? >> 8); } -sub compress_dst { - my ($out, $sfx, $lei) = @_; - my $cmd = []; - if ($sfx eq 'gz') { - $cmd->[0] = which($lei->{env}->{GZIP} // 'pigz') // - which('gzip') // - die "pigz or gzip missing for $sfx\n"; - # TODO: use IO::Compress::Gzip - push @$cmd, '-c'; # stdout - push @$cmd, '--rsyncable' if $lei->{opt}->{rsyncable}; - } else { - die "TODO $sfx" +# all of these support -c for stdout and -d for decompression, +# mutt is commonly distributed with hooks for gz, bz2 and xz, at least +# { foo => '' } means "--foo" is passed to the command-line, +# otherwise { foo => '--bar' } passes "--bar" +our %zsfx2cmd = ( + gz => [ qw(GZIP pigz gzip), { + rsyncable => '', threads => '-p' } ], + bz2 => [ 'bzip2', {} ], + xz => [ 'xz', { threads => '-T' } ], + # XXX does anybody care for these? I prefer zstd on entire FSes, + # so it's probably not necessary on a per-file basis + # zst => [ 'zstd', { -default => [ qw(-q) ], # it's noisy by default + # rsyncable => '', threads => '-T' } ], + # zz => [ 'pigz', { -default => [ '--zlib' ], + # rsyncable => '', threads => '-p' }], + # lzo => [ 'lzop', {} ], + # lzma => [ 'lzma', {} ], +); + +sub zsfx2cmd ($$$) { + my ($zsfx, $decompress, $lei) = @_; + my $x = $zsfx2cmd{$zsfx} // die "no support for suffix=.$zsfx"; + my @info = @$x; + my $cmd_opt = pop @info; + my @cmd = (undef, $decompress ? qw(-dc) : qw(-c)); + for my $exe (@info) { + # I think respecting client's ENV{GZIP} is OK, not sure + # about ENV overrides for other, less-common compressors + if ($exe eq uc($exe)) { + $exe = $lei->{env}->{$exe} or next; + } + $cmd[0] = which($exe) and last; + } + $cmd[0] // die join(' or ', @info)." missing for .$zsfx"; + # push @cmd, @{$cmd_opt->{-default}} if $cmd_opt->{-default}; + for my $bool (qw(rsyncable)) { + my $switch = $cmd_opt->{rsyncable} // next; + push @cmd, '--'.($switch || $bool); + } + for my $key (qw(threads)) { # support compression level? + my $switch = $cmd_opt->{$key} // next; + my $val = $lei->{opt}->{$key} // next; + push @cmd, $switch, $val; } + \@cmd; +} + +sub compress_dst { + my ($out, $zsfx, $lei) = @_; + my $cmd = zsfx2cmd($zsfx, undef, $lei); pipe(my ($r, $w)) or die "pipe: $!"; my $rdr = { 0 => $r, 1 => $out, 2 => $lei->{2} }; my $pid = spawn($cmd, $lei->{env}, $rdr); $lei->{"pid.$pid"} = $cmd; my $pp = gensym; tie *$pp, 'PublicInbox::ProcessPipe', $pid, $w, \&reap_compress, $lei; - my $tmp = File::Temp->new("$sfx.lock-XXXXXX", TMPDIR => 1); - my $pipe_lk = ($lei->{opt}->{jobs} // 0) > 1 ? bless({ - lock_path => $tmp->filename, - tmp => $tmp - }, 'PublicInbox::Lock') : undef; + my $pipe_lk = ($lei->{opt}->{jobs} // 0) > 1 ? + PublicInbox::Lock->new_tmp($zsfx) : undef; ($pp, $pipe_lk); } -sub write_cb { - my ($cls, $dst, $lei) = @_; - if ($dst =~ s!\A(mbox(?:rd|cl|cl2|o))?:!!) { - my $m = "eml2$1"; - my $eml2mbox = $cls->can($m) or die "$cls->$m missing"; - my ($out, $pipe_lk); - open $out, '>>', $dst or die "open $dst: $!"; - my $atomic = !!(($lei->{opt}->{jobs} // 0) > 1); - if ($dst =~ /\.(gz|bz2|xz)\z/) { - ($out, $pipe_lk) = compress_dst($out, $1, $lei); - } - sub { - my ($buf, $oid, $kw) = @_; - $buf = $eml2mbox->(PublicInbox::Eml->new($buf), $kw); +sub decompress_src ($$$) { + my ($in, $zsfx, $lei) = @_; + my $cmd = zsfx2cmd($zsfx, 1, $lei); + my $rdr = { 0 => $in, 2 => $lei->{2} }; + popen_rd($cmd, $lei->{env}, $rdr); +} + +sub dup_src ($) { + my ($in) = @_; + open my $dup, '+>>&', $in or die "dup: $!"; + $dup; +} + +# --augment existing output destination, without duplicating anything +sub _augment { # MboxReader eml_cb + my ($eml, $lei) = @_; + $lei->{skv}->set_maybe(content_hash($eml), ''); +} + +sub _mbox_write_cb ($$$$) { + my ($cls, $mbox, $dst, $lei) = @_; + my $m = "eml2$mbox"; + my $eml2mbox = $cls->can($m) or die "$cls->$m missing"; + my ($out, $pipe_lk); + open $out, '+>>', $dst or die "open $dst: $!"; + # Perl does SEEK_END even with O_APPEND :< + seek($out, 0, SEEK_SET) or die "seek $dst: $!"; + my $atomic = !!(($lei->{opt}->{jobs} // 0) > 1); + $lei->{skv} = PublicInbox::SharedKV->new; + $lei->{skv}->dbh; + state $zsfx_allow = join('|', keys %zsfx2cmd); + my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/); + if ($lei->{opt}->{augment}) { + my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : + dup_src($out); + PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei); + } else { + truncate($out, 0) or die "truncate $dst: $!"; + } + ($out, $pipe_lk) = compress_dst($out, $zsfx, $lei) if $zsfx; + sub { + my ($buf, $oid, $kw) = @_; + my $eml = PublicInbox::Eml->new($buf); + if ($lei->{skv}->set_maybe(content_hash($eml), '')) { + $buf = $eml2mbox->($eml, $kw); my $lock = $pipe_lk->lock_for_scope if $pipe_lk; write_in_full($out, $buf, $atomic); } } } +sub write_cb { # returns a callback for git_to_mail + my ($cls, $dst, $lei) = @_; + if ($dst =~ s!\A(mbox(?:rd|cl|cl2|o))?:!!) { + _mbox_write_cb($cls, $1, $dst, $lei); + } +} + 1; diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm index 7fd17745..f6eaa5ce 100644 --- a/lib/PublicInbox/Lock.pm +++ b/lib/PublicInbox/Lock.pm @@ -8,6 +8,7 @@ use v5.10.1; use Fcntl qw(:flock :DEFAULT); use Carp qw(croak); use PublicInbox::OnDestroy; +use File::Temp (); # we only acquire the flock if creating or reindexing; # PublicInbox::Import already has the lock on its own. @@ -40,4 +41,10 @@ sub lock_for_scope { PublicInbox::OnDestroy->new(\&lock_release, $self); } +sub new_tmp { + my ($cls, $ident) = @_; + my $tmp = File::Temp->new("$ident.lock-XXXXXX", TMPDIR => 1); + bless { lock_path => $tmp->filename, tmp => $tmp }, $cls; +} + 1; diff --git a/lib/PublicInbox/MboxReader.pm b/lib/PublicInbox/MboxReader.pm index e1944aaf..ac0c0f52 100644 --- a/lib/PublicInbox/MboxReader.pm +++ b/lib/PublicInbox/MboxReader.pm @@ -5,6 +5,7 @@ package PublicInbox::MboxReader; use strict; use v5.10.1; +use PublicInbox::DS (); # localize $in_loop for error detection :< use Data::Dumper; $Data::Dumper::Useqq = 1; # should've been the default, for bad data @@ -13,6 +14,7 @@ my $from_strict = sub _mbox_from { my ($mbfh, $from_re, $eml_cb, @arg) = @_; + local $PublicInbox::DS::in_loop; # disable dwaitpid my $buf = ''; my @raw; while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) { @@ -73,6 +75,7 @@ sub _extract_hdr { sub _mbox_cl ($$$;@) { my ($mbfh, $uxs_from, $eml_cb, @arg) = @_; + local $PublicInbox::DS::in_loop; # disable dwaitpid my $buf = ''; while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) { if ($r == 0) { # detect "curl --fail" diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t index 231cf543..e4551e69 100644 --- a/t/lei_to_mail.t +++ b/t/lei_to_mail.t @@ -92,21 +92,40 @@ my $orig = do { is($raw, do { local $/; <$fh> }, 'jobs > 1'); $raw; }; -SKIP: { - use PublicInbox::Spawn qw(which); - my $gzip = which('gzip') or skip 'gzip not found', 1; - my $wcb = PublicInbox::LeiToMail->write_cb("mboxcl2:$fn.gz", $lei); - $wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]); - undef $wcb; - my $uncompressed = xqx([$gzip, '-dc', "$fn.gz"]); - is($uncompressed, $orig, 'gzip works'); +for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? + my $zsfx2cmd = PublicInbox::LeiToMail->can('zsfx2cmd'); + SKIP: { + my $cmd = eval { $zsfx2cmd->($zsfx, 0, $lei) }; + skip $@, 3 if $@; + my $dc_cmd = eval { $zsfx2cmd->($zsfx, 1, $lei) }; + ok($dc_cmd, "decompressor for .$zsfx"); + my $f = "$fn.$zsfx"; + my $dst = "mboxcl2:$f"; + my $wcb = PublicInbox::LeiToMail->write_cb($dst, $lei); + $wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]); + undef $wcb; + my $uncompressed = xqx([@$dc_cmd, $f]); + is($uncompressed, $orig, "$zsfx works unlocked"); - local $lei->{opt} = { jobs => 2 }; - unlink "$fn.gz" or die "unlink $!"; - $wcb = PublicInbox::LeiToMail->write_cb("mboxcl2:$fn.gz", $lei); - $wcb->(\(my $dupe = $buf), 'deadbeef', [ qw(seen) ]); - undef $wcb; - is(xqx([$gzip, '-dc', "$fn.gz"]), $orig); + local $lei->{opt} = { jobs => 2 }; # for atomic writes + unlink $f or BAIL_OUT "unlink $!"; + $wcb = PublicInbox::LeiToMail->write_cb($dst, $lei); + $wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]); + undef $wcb; + is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock"); + } +} + +unlink $fn or BAIL_OUT $!; +if ('default deduplication uses content_hash') { + my $wcb = PublicInbox::LeiToMail->write_cb("mboxo:$fn", $lei); + $wcb->(\(my $x = $buf), 'deadbeef', []) for (1..2); + undef $wcb; # undef to commit changes + my $cmp = ''; + open my $fh, '<', $fn or BAIL_OUT $!; + require PublicInbox::MboxReader; + PublicInbox::MboxReader->mboxo($fh, sub { $cmp .= shift->as_string }); + is($cmp, $buf, 'only one message written'); } done_testing;