* [PATCH 1/2] lei q: parallelize Maildir and mbox writing
2021-01-18 10:30 [PATCH 0/2] lei q: write faster, mutt does less work Eric Wong
@ 2021-01-18 10:30 ` Eric Wong
2021-01-18 21:19 ` Eric Wong
2021-01-18 10:30 ` [PATCH 2/2] lei_to_mail: optimize for MUAs Eric Wong
1 sibling, 1 reply; 4+ messages in thread
From: Eric Wong @ 2021-01-18 10:30 UTC (permalink / raw)
To: meta
With 4 dedicated workers, this seems to provide a 100-120%
speedup on a 4 core machine when writing thousands of search
results to a Maildir or mbox. This also sets us up for
high-latency IMAP destinations in the future.
This opens the door to more speedup opportunities such
as optimizing dedupe locking and other ways to reduce
contention.
This change is fairly complex and convoluted, unfortunately.
Further work may allow us to simplify it and even improve
performance.
---
lib/PublicInbox/IPC.pm | 3 +++
lib/PublicInbox/LEI.pm | 36 ++++++++++++++++++++++++----------
lib/PublicInbox/LeiOverview.pm | 36 +++++++++++++++++++++++++++++++---
lib/PublicInbox/LeiQuery.pm | 12 +++++++++---
lib/PublicInbox/LeiToMail.pm | 29 +++++++++++++++++++++++++++
lib/PublicInbox/LeiXSearch.pm | 27 +++++++++++++++----------
lib/PublicInbox/Spawn.pm | 2 +-
7 files changed, 118 insertions(+), 27 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 78cb8400..8fec2e62 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -397,4 +397,7 @@ sub DESTROY {
ipc_worker_stop($self);
}
+# Sereal doesn't have dclone
+sub deep_clone { thaw(freeze($_[-1])) }
+
1;
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 61f2a65b..6b6ee0f5 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -279,13 +279,21 @@ sub atfork_prepare_wq {
if (my $sock = $self->{sock}) {
push @$tcafc, @$self{qw(0 1 2)}, $sock;
}
+ for my $f (qw(lxs l2m)) {
+ my $ipc = $self->{$f} or next;
+ push @$tcafc, grep { defined }
+ @$ipc{qw(-wq_s1 -wq_s2 -ipc_req -ipc_res)};
+ }
}
# usage: my %sig = $lei->atfork_child_wq($wq);
# local @SIG{keys %sig} = values %sig;
sub atfork_child_wq {
my ($self, $wq) = @_;
- @$self{qw(0 1 2 sock)} = delete(@$wq{0..3});
+ my ($sock, $l2m_wq_s1);
+ (@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4});
+ $self->{sock} = $sock if -S $sock;
+ $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1;
%PATH2CFG = ();
$quit = \&CORE::exit;
@TO_CLOSE_ATFORK_CHILD = ();
@@ -304,15 +312,23 @@ sub atfork_child_wq {
# usage: ($lei, @io) = $lei->atfork_parent_wq($wq);
sub atfork_parent_wq {
my ($self, $wq) = @_;
- if ($wq->wq_workers) {
- my $env = delete $self->{env}; # env is inherited at fork
- my $ret = bless { %$self }, ref($self);
- $self->{env} = $env;
- delete @$ret{qw(-lei_store cfg pgr)};
- ($ret, delete @$ret{0..2}, delete($ret->{sock}) // ());
- } else {
- ($self, @$self{0..2}, $self->{sock} // ());
+ my $env = delete $self->{env}; # env is inherited at fork
+ my $ret = bless { %$self }, ref($self);
+ if (my $dedupe = delete $ret->{dedupe}) {
+ $ret->{dedupe} = $wq->deep_clone($dedupe);
+ }
+ $self->{env} = $env;
+ delete @$ret{qw(-lei_store cfg pgr lxs)}; # keep l2m
+ my @io = delete @$ret{0..2};
+ $io[3] = delete($ret->{sock}) // *STDERR{GLOB};
+ my $l2m = $ret->{l2m};
+ if ($l2m && $l2m != $wq) {
+ $io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1};
+ if (my @pids = $l2m->wq_close) {
+ $wq->{l2m_pids} = \@pids;
+ }
}
+ ($ret, @io);
}
sub _help ($;$) {
@@ -656,7 +672,7 @@ sub start_mua {
@cmd = map { $_ eq '%f' ? ($replaced = $mfolder) : $_ } @cmd;
push @cmd, $mfolder unless defined($replaced);
$sock //= $self->{sock};
- if ($sock) { # lei(1) client process runs it
+ if ($PublicInbox::DS::in_loop) { # lei(1) client process runs it
send($sock, exec_buf(\@cmd, {}), MSG_EOR);
} else { # oneshot
$self->{"mua.pid.$self.$$"} = spawn(\@cmd);
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index c0b423f6..538d6bd5 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -140,6 +140,16 @@ sub _unbless_smsg {
sub ovv_atexit_child {
my ($self, $lei) = @_;
+ if (my $l2m = delete $lei->{l2m}) {
+ # gracefully stop lei2mail processes after all
+ # ->write_mail work is complete
+ delete $l2m->{-wq_s1};
+ if (my $rd = delete $l2m->{each_smsg_done}) {
+ read($rd, my $buf, 1); # wait for EOF
+ }
+ }
+ # order matters, git->{-tmp}->DESTROY must not fire until
+ # {each_smsg_done} hits EOF above
if (my $git = delete $self->{git}) {
$git->async_wait_all;
}
@@ -178,8 +188,6 @@ sub _json_pretty {
sub ovv_each_smsg_cb { # runs in wq worker usually
my ($self, $lei, $ibxish) = @_;
- $lei->{ovv_buf} = \(my $buf = '');
- delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel};
my $json;
$lei->{1}->autoflush(1);
if (my $pkg = $self->{json}) {
@@ -187,7 +195,27 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
$json->utf8->canonical;
$json->ascii(1) if $lei->{opt}->{ascii};
}
- if (my $l2m = $lei->{l2m}) {
+ my $l2m = $lei->{l2m};
+ if ($l2m && $l2m->{-wq_s1}) {
+ my ($lei_ipc, @io) = $lei->atfork_parent_wq($l2m);
+ # n.b. $io[0] = qry_status_wr, $io[1] = mbox|stdout,
+ # $io[4] becomes a notification pipe that triggers EOF
+ # in this wq worker when all outstanding ->write_mail
+ # calls are complete
+ die "BUG: \$io[4] $io[4] unexpected" if $io[4];
+ pipe($l2m->{each_smsg_done}, $io[4]) or die "pipe: $!";
+ fcntl($io[4], 1031, 4096) if $^O eq 'linux';
+ delete @$lei_ipc{qw(l2m opt mset_opt cmd)};
+ my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
+ $self->{git} = $git;
+ my $git_dir = $git->{git_dir};
+ sub {
+ my ($smsg, $mitem) = @_;
+ my $kw = []; # TODO get from mitem
+ $l2m->wq_do('write_mail', \@io, $git_dir,
+ $smsg->{blob}, $lei_ipc, $kw)
+ }
+ } elsif ($l2m) {
my $wcb = $l2m->write_cb($lei);
my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
$self->{git} = $git; # for ovv_atexit_child
@@ -199,6 +227,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
};
} elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
+ $lei->{ovv_buf} = \(my $buf = '');
sub { # DIY prettiness :P
my ($smsg, $mitem) = @_;
$smsg = _unbless_smsg($smsg, $mitem);
@@ -221,6 +250,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
}
} elsif ($json) {
my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
+ $lei->{ovv_buf} = \(my $buf = '');
sub {
my ($smsg, $mitem) = @_;
delete @$smsg{qw(tid num)};
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index a80d5887..d6e801e3 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -41,11 +41,17 @@ sub lei_q {
$j = 1 if !$opt->{thread};
$j++ if $opt->{'local'}; # for sto->search below
$self->atfork_prepare_wq($lxs);
- $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
- // $lxs->wq_workers($j);
+ $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset);
+ $self->{lxs} = $lxs;
- # no forking workers after this
my $ovv = PublicInbox::LeiOverview->new($self) or return;
+ if (my $l2m = $self->{l2m}) {
+ $j = 4 if $j <= 4; # TODO configurable
+ $self->atfork_prepare_wq($l2m);
+ $l2m->wq_workers_start('lei2mail', $j, $self->oldset);
+ }
+
+ # no forking workers after this
my $sto = $self->_lei_store(1);
unshift(@srcs, $sto->search) if $opt->{'local'};
my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 0e23b8da..17d48a90 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -5,6 +5,7 @@
package PublicInbox::LeiToMail;
use strict;
use v5.10.1;
+use parent qw(PublicInbox::IPC);
use PublicInbox::Eml;
use PublicInbox::Lock;
use PublicInbox::ProcessPipe;
@@ -14,6 +15,8 @@ use Symbol qw(gensym);
use IO::Handle; # ->autoflush
use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
use Errno qw(EEXIST ESPIPE ENOENT);
+use File::Temp 0.19 (); # 0.19 for ->newdir
+use PublicInbox::Git;
my %kw2char = ( # Maildir characters
draft => 'D',
@@ -422,4 +425,30 @@ sub lock_free {
$_[0]->{base_type} =~ /\A(?:maildir|mh|imap|jmap)\z/ ? 1 : 0;
}
+sub write_mail { # via ->wq_do
+ my ($self, $git_dir, $oid, $lei, $kw) = @_;
+ my $wcb = $self->{wcb} //= do { # first message
+ my %sig = $lei->atfork_child_wq($self);
+ @SIG{keys %sig} = values %sig; # not local
+ $lei->{dedupe}->prepare_dedupe;
+ $self->write_cb($lei);
+ };
+ my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
+ $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw ]);
+}
+
+sub ipc_atfork_prepare {
+ my ($self) = @_;
+ # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr)
+ $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= >&=]);
+ $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
+}
+
+sub DESTROY {
+ my ($self) = @_;
+ for my $pid_git (grep(/\A$$\0/, keys %$self)) {
+ $self->{$pid_git}->async_wait_all;
+ }
+}
+
1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 91864cd0..dc5cf3b6 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -126,6 +126,7 @@ sub query_thread_mset { # for --thread
@{$ctx->{xids}} = ();
}
} while (_mset_more($mset, $mo));
+ undef $each_smsg; # drops @io for l2m->{each_smsg_done}
$lei->{ovv}->ovv_atexit_child($lei);
}
@@ -147,6 +148,7 @@ sub query_mset { # non-parallel for non-"--thread" users
$each_smsg->($smsg, $it);
}
} while (_mset_more($mset, $mo));
+ undef $each_smsg; # drops @io for l2m->{each_smsg_done}
$lei->{ovv}->ovv_atexit_child($lei);
}
@@ -170,11 +172,14 @@ sub git {
}
sub query_done { # EOF callback
- my ($lei) = @_;
- $lei->{ovv}->ovv_end($lei);
- if (my $l2m = $lei->{l2m}) {
- $lei->start_mua unless $l2m->lock_free;
+ my ($self, $lei) = @_;
+ my $l2m = delete $lei->{l2m};
+ if (my $pids = delete $self->{l2m_pids}) {
+ my $ipc_worker_reap = $self->can('ipc_worker_reap');
+ dwaitpid($_, $ipc_worker_reap, $l2m) for @$pids;
}
+ $lei->{ovv}->ovv_end($lei);
+ $lei->start_mua if $l2m && !$l2m->lock_free;
$lei->dclose;
}
@@ -188,12 +193,10 @@ sub start_query { # always runs in main (lei-daemon) process
}
my $remotes = $self->{remotes} // [];
if ($lei->{opt}->{thread}) {
- $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
for my $ibxish (@$srcs) {
$self->wq_do('query_thread_mset', $io, $lei, $ibxish);
}
} else {
- $lei->{-parallel} = scalar(@$remotes);
$self->wq_do('query_mset', $io, $lei, $srcs);
}
# TODO
@@ -226,12 +229,12 @@ sub do_query {
$io[0] = undef;
pipe(my $qry_status_rd, $io[0]) or die "pipe $!";
- $lei_orig->{lxs} = $self;
$lei_orig->event_step_init; # wait for shutdowns
- my $op_map = { '' => [ \&query_done, $lei_orig ] };
+ my $op_map = { '' => [ \&query_done, $self, $lei_orig ] };
my $in_loop = exists $lei_orig->{sock};
my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
- if (my $l2m = $lei->{l2m}) {
+ my $l2m = $lei->{l2m};
+ if ($l2m) {
$l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
$io[1] = $lei_orig->{1};
$op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
@@ -246,13 +249,17 @@ sub do_query {
$op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
$opp->event_step;
my $ipc_worker_reap = $self->can('ipc_worker_reap');
+ if (my $l2m_pids = delete $self->{l2m_pids}) {
+ dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids;
+ }
dwaitpid($_, $ipc_worker_reap, $self) for @pids;
}
}
sub ipc_atfork_prepare {
my ($self) = @_;
- $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]);
+ # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: $l2m->{-wq_s1})
+ $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&=]);
$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
}
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index e5c0b1e9..b03f2d59 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS';
#include <sys/socket.h>
#if defined(CMSG_SPACE) && defined(CMSG_LEN)
-#define SEND_FD_CAPA 4
+#define SEND_FD_CAPA 5
#define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
union my_cmsg {
struct cmsghdr hdr;
^ permalink raw reply related [flat|nested] 4+ messages in thread
* [PATCH 2/2] lei_to_mail: optimize for MUAs
2021-01-18 10:30 [PATCH 0/2] lei q: write faster, mutt does less work Eric Wong
2021-01-18 10:30 ` [PATCH 1/2] lei q: parallelize Maildir and mbox writing Eric Wong
@ 2021-01-18 10:30 ` Eric Wong
1 sibling, 0 replies; 4+ messages in thread
From: Eric Wong @ 2021-01-18 10:30 UTC (permalink / raw)
To: meta
Instead of optimizing our own performance, this optimizes
our data to reduce work done by the MUA consumer.
Maildir and mbox destinations no longer support any notion of
the IMAP \Recent flag. JMAP has no functioning \Recent
equivalent, and neither do we.
In practice, having MUAs (e.g. mutt) clear the \Recent flag when
committing changes to the mbox is expensive: it creates a
rename(2) storm with Maildir and overwrites the entire mbox.
For mboxcl2 (and mboxcl), we'll further optimize mutt behavior
by setting the Lines: header in addition to Content-Length.
With these changes, mutt exits instantaneously on mboxcl2,
mboxcl, and Maildirs generated by "lei q".
---
lib/PublicInbox/LeiToMail.pm | 30 ++++++++++++++++++++++--------
t/lei_to_mail.t | 16 +++++++++++-----
t/mbox_reader.t | 2 ++
3 files changed, 35 insertions(+), 13 deletions(-)
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 17d48a90..1281aef4 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -35,7 +35,11 @@ my %kw2status = (
sub _mbox_hdr_buf ($$$) {
my ($eml, $type, $kw) = @_;
$eml->header_set($_) for (qw(Lines Bytes Content-Length));
- my %hdr; # set Status, X-Status
+
+ # Messages are always 'O' (non-\Recent in IMAP), it saves
+ # MUAs the trouble of rewriting the mbox if no other
+ # changes are made
+ my %hdr = (Status => [ 'O' ]); # set Status, X-Status
for my $k (@$kw) {
if (my $ent = $kw2status{$k}) {
push @{$hdr{$ent->[0]}}, $ent->[1];
@@ -93,6 +97,16 @@ sub eml2mboxo {
$buf;
}
+sub _mboxcl_common ($$$) {
+ my ($buf, $bdy, $crlf) = @_;
+ # add Lines: so mutt won't have to add it on MUA close
+ my $lines = $$bdy =~ tr!\n!\n!;
+ $$buf .= 'Content-Length: '.length($$bdy).$crlf.
+ 'Lines: '.$lines.$crlf.$crlf;
+ substr($$bdy, 0, 0, $$buf); # prepend header
+ $_[0] = $bdy;
+}
+
# mboxcl still escapes "From " lines
sub eml2mboxcl {
my ($eml, $kw) = @_;
@@ -100,9 +114,7 @@ sub eml2mboxcl {
my $crlf = $eml->{crlf};
if (my $bdy = delete $eml->{bdy}) {
$$bdy =~ s/^From />From /gm;
- $$buf .= 'Content-Length: '.length($$bdy).$crlf.$crlf;
- substr($$bdy, 0, 0, $$buf); # prepend header
- $buf = $bdy;
+ _mboxcl_common($buf, $bdy, $crlf);
}
$$buf .= $crlf;
$buf;
@@ -114,9 +126,7 @@ sub eml2mboxcl2 {
my $buf = _mbox_hdr_buf($eml, 'mboxcl2', $kw);
my $crlf = $eml->{crlf};
if (my $bdy = delete $eml->{bdy}) {
- $$buf .= 'Content-Length: '.length($$bdy).$crlf.$crlf;
- substr($$bdy, 0, 0, $$buf); # prepend header
- $buf = $bdy;
+ _mboxcl_common($buf, $bdy, $crlf);
}
$$buf .= $crlf;
$buf;
@@ -277,7 +287,11 @@ sub _buf2maildir {
} while (!sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY) &&
$! == EEXIST && ($rand = int(rand 0x7fffffff).','));
if (print $fh $$buf and close($fh)) {
- $dst .= $sfx eq '' ? 'new/' : 'cur/';
+ # ignore new/ and write only to cur/, otherwise MUAs
+ # with R/W access to the Maildir will end up doing
+ # a mass rename which can take a while with thousands
+ # of messages.
+ $dst .= 'cur/';
$rand = '';
do {
$final = $dst.$rand."oid=$oid:2,$sfx";
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index 083e0df4..e5ac8eac 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -25,7 +25,7 @@ for my $mbox (@MBOX) {
my $s = $cb->(PublicInbox::Eml->new($from), $kw);
is(substr($$s, -1, 1), "\n", "trailing LF in normal $mbox");
my $eml = PublicInbox::Eml->new($s);
- is($eml->header('Status'), 'R', "Status: set by $m");
+ is($eml->header('Status'), 'OR', "Status: set by $m");
is($eml->header('X-Status'), 'AF', "X-Status: set by $m");
if ($mbox eq 'mboxcl2') {
like($eml->body_raw, qr/^From /, "From not escaped $m");
@@ -170,6 +170,12 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
}
}
+my $as_orig = sub {
+ my ($eml) = @_;
+ $eml->header_set('Status');
+ $eml->as_string;
+};
+
unlink $fn or BAIL_OUT $!;
if ('default deduplication uses content_hash') {
my $wcb = $wcb_get->('mboxo', $fn);
@@ -177,7 +183,7 @@ if ('default deduplication uses content_hash') {
undef $wcb; # undef to commit changes
my $cmp = '';
open my $fh, '<', $fn or BAIL_OUT $!;
- PublicInbox::MboxReader->mboxo($fh, sub { $cmp .= shift->as_string });
+ PublicInbox::MboxReader->mboxo($fh, sub { $cmp .= $as_orig->(@_) });
is($cmp, $buf, 'only one message written');
local $lei->{opt} = { augment => 1 };
@@ -186,7 +192,7 @@ if ('default deduplication uses content_hash') {
undef $wcb; # undef to commit changes
open $fh, '<', $fn or BAIL_OUT $!;
my @x;
- PublicInbox::MboxReader->mboxo($fh, sub { push @x, shift->as_string });
+ PublicInbox::MboxReader->mboxo($fh, sub { push @x, $as_orig->(@_) });
is(scalar(@x), 2, 'augmented mboxo');
is($x[0], $cmp, 'original message preserved');
is($x[1], $buf . "\nx\n", 'new message appended');
@@ -200,7 +206,7 @@ if ('default deduplication uses content_hash') {
undef $wcb; # commit
seek($tmp, 0, SEEK_SET) or BAIL_OUT $!;
my $cmp = '';
- PublicInbox::MboxReader->mboxrd($tmp, sub { $cmp .= shift->as_string });
+ PublicInbox::MboxReader->mboxrd($tmp, sub { $cmp .= $as_orig->(@_) });
is($cmp, $buf, 'message written to stdout');
}
@@ -213,7 +219,7 @@ SKIP: { # FIFO support
$wcb->(\(my $x = $buf), 'deadbeef', []);
undef $wcb; # commit
my $cmp = '';
- PublicInbox::MboxReader->mboxo($cat, sub { $cmp .= shift->as_string });
+ PublicInbox::MboxReader->mboxo($cat, sub { $cmp .= $as_orig->(@_) });
is($cmp, $buf, 'message written to FIFO');
}
diff --git a/t/mbox_reader.t b/t/mbox_reader.t
index 4ea2ae29..30a5e6e3 100644
--- a/t/mbox_reader.t
+++ b/t/mbox_reader.t
@@ -45,6 +45,8 @@ my $check_fmt = sub {
seek($fh, 0, SEEK_SET) or BAIL_OUT "seek: $!";
$reader->$fmt($fh, sub {
my ($eml) = @_;
+ $eml->header_set('Status');
+ $eml->header_set('Lines');
my $cur = shift @order;
my @cl = $eml->header_raw('Content-Length');
if ($fmt =~ /\Amboxcl/) {
^ permalink raw reply related [flat|nested] 4+ messages in thread