unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH 0/9] lei + import-related updates
@ 2023-10-11  7:20 Eric Wong
  2023-10-11  7:20 ` [PATCH 1/9] lei rediff: use ProcessIO for --drq support Eric Wong
                   ` (8 more replies)
  0 siblings, 9 replies; 10+ messages in thread
From: Eric Wong @ 2023-10-11  7:20 UTC (permalink / raw)
  To: meta

A few more ProcessIO conversions to start with, and then
cleanups while I started working on import-related stuff.
Some of this will tie in nicely for FUSE, too...

I've realized msgtime messages were pointless anyways since
there's nothing anybody can really do about bad messages that
get through various upstream spam filters.

5/9 is a long-overdue cleanup I noticed while going
over Import.pm

9/9 ought to fix the fragile t/lei-store-fail.t test
by using new features.

Eric Wong (9):
  lei rediff: use ProcessIO for --drq support
  lei_xsearch: improve curl progress reporting
  msgtime: quiet warnings we can do nothing about
  msgtime: simplify msg_timestamp and msg_datestamp
  treewide: consolidate "From " line removal
  import: switch to Unix stream socket for fast-import
  import: cat_blob is a no-op w/o live fast-import
  lei blob: run cat_blob on lei/store for pending blobs
  lei import|tag|rm: support --commit-delay=SECONDS

 lib/PublicInbox/Eml.pm        |   6 ++
 lib/PublicInbox/IMAP.pm       |   2 +-
 lib/PublicInbox/Import.pm     | 138 ++++++++++++++++------------------
 lib/PublicInbox/LEI.pm        |  23 +++---
 lib/PublicInbox/LeiBlob.pm    |  16 ++--
 lib/PublicInbox/LeiInput.pm   |   5 +-
 lib/PublicInbox/LeiInspect.pm |   2 +-
 lib/PublicInbox/LeiRediff.pm  |  33 ++++----
 lib/PublicInbox/LeiStore.pm   |  11 +++
 lib/PublicInbox/LeiToMail.pm  |   3 +-
 lib/PublicInbox/LeiXSearch.pm |  34 +++++----
 lib/PublicInbox/Mbox.pm       |  16 ++--
 lib/PublicInbox/MboxReader.pm |   2 +-
 lib/PublicInbox/MsgTime.pm    |  49 +++++-------
 lib/PublicInbox/NNTP.pm       |   3 +-
 lib/PublicInbox/ProcessIO.pm  |  18 ++---
 lib/PublicInbox/Spawn.pm      |   1 +
 script/public-inbox-convert   |  18 ++---
 script/public-inbox-edit      |   5 +-
 script/public-inbox-learn     |   2 +-
 script/public-inbox-mda       |   4 +-
 script/public-inbox-purge     |   4 +-
 t/lei-import.t                |  13 ++++
 t/lei-store-fail.t            |  20 +++--
 t/lei-tag.t                   |  15 +++-
 25 files changed, 230 insertions(+), 213 deletions(-)


^ permalink raw reply	[flat|nested] 10+ messages in thread

* [PATCH 1/9] lei rediff: use ProcessIO for --drq support
  2023-10-11  7:20 [PATCH 0/9] lei + import-related updates Eric Wong
@ 2023-10-11  7:20 ` Eric Wong
  2023-10-11  7:20 ` [PATCH 2/9] lei_xsearch: improve curl progress reporting Eric Wong
                   ` (7 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2023-10-11  7:20 UTC (permalink / raw)
  To: meta

This required fixing binmode support a few commits ago, along
with properly enabling autoflush in popen_wr instead of setting
it on the wrapper ProcessIO class.
---
 lib/PublicInbox/LeiRediff.pm | 33 ++++++++++++++-------------------
 lib/PublicInbox/ProcessIO.pm | 18 +++++-------------
 lib/PublicInbox/Spawn.pm     |  1 +
 3 files changed, 20 insertions(+), 32 deletions(-)

diff --git a/lib/PublicInbox/LeiRediff.pm b/lib/PublicInbox/LeiRediff.pm
index b894342b..230f3e83 100644
--- a/lib/PublicInbox/LeiRediff.pm
+++ b/lib/PublicInbox/LeiRediff.pm
@@ -138,35 +138,30 @@ EOM
 	undef;
 }
 
-sub wait_requote { # OnDestroy callback
-	my ($lei, $pid, $old_1) = @_;
-	$lei->{1} = $old_1; # closes stdin of `perl -pe 's/^/> /'`
-	waitpid($pid, 0) == $pid or die "BUG(?) waitpid: \$!=$! \$?=$?";
-	$lei->child_error($?) if $?;
-}
+# awaitpid callback
+sub wait_requote { $_[1]->child_error($?) if $? }
 
-sub requote ($$) {
+sub requote ($$) { # '> ' prefix(es) lei->{1}
 	my ($lei, $pfx) = @_;
-	my $old_1 = $lei->{1};
-	my $opt = { 1 => $old_1, 2 => $lei->{2} };
+	my $opt = { 1 => $lei->{1}, 2 => $lei->{2} };
 	# $^X (perl) is overkill, but maybe there's a weird system w/o sed
-	my ($w, $pid) = popen_wr([$^X, '-pe', "s/^/$pfx/"], $lei->{env}, $opt);
-	$w->autoflush(1);
-	binmode $w, ':utf8'; # incompatible with ProcessIO due to syswrite
-	$lei->{1} = $w;
-	PublicInbox::OnDestroy->new(\&wait_requote, $lei, $pid, $old_1);
+	my $w = popen_wr([$^X, '-pe', "s/^/$pfx/"], $lei->{env}, $opt,
+			 \&wait_requote, $lei);
+	binmode $w, ':utf8';
+	$w;
 }
 
 sub extract_oids { # Eml each_part callback
 	my ($ary, $self) = @_;
+	my $lei = $self->{lei};
 	my ($p, undef, $idx) = @$ary;
-	$self->{lei}->out($p->header_obj->as_string, "\n");
+	$lei->out($p->header_obj->as_string, "\n");
 	my ($s, undef) = msg_part_text($p, $p->content_type || 'text/plain');
 	defined $s or return;
-	my $rq;
-	if ($self->{dqre} && $s =~ s/$self->{dqre}//g) { # '> ' prefix(es)
-		$rq = requote($self->{lei}, $1) if $self->{lei}->{opt}->{drq};
-	}
+
+	$self->{dqre} && $s =~ s/$self->{dqre}//g && $lei->{opt}->{drq} and
+		local $lei->{1} = requote($lei, $1);
+
 	my @top = split($PublicInbox::ViewDiff::EXTRACT_DIFFS, $s);
 	undef $s;
 	my $blobs = $self->{blobs}; # blobs to resolve
diff --git a/lib/PublicInbox/ProcessIO.pm b/lib/PublicInbox/ProcessIO.pm
index f120edd0..ea5d3e6c 100644
--- a/lib/PublicInbox/ProcessIO.pm
+++ b/lib/PublicInbox/ProcessIO.pm
@@ -7,6 +7,7 @@ package PublicInbox::ProcessIO;
 use v5.12;
 use PublicInbox::DS qw(awaitpid);
 use Symbol qw(gensym);
+use bytes qw(length);
 
 sub maybe_new {
 	my ($cls, $pid, $fh, @cb_arg) = @_;
@@ -31,25 +32,16 @@ sub TIEHANDLE {
 	$self;
 }
 
-# for IO::Uncompress::Gunzip
-sub BINMODE {
-	return binmode($_[0]->{fh}) if @_ == 1;
-	binmode $_[0]->{fh}, $_[1];
-}
+# for IO::Uncompress::Gunzip and PublicInbox::LeiRediff
+sub BINMODE { @_ == 1 ? binmode($_[0]->{fh}) : binmode($_[0]->{fh}, $_[1]) }
 
 sub READ { read($_[0]->{fh}, $_[1], $_[2], $_[3] || 0) }
 
 sub READLINE { readline($_[0]->{fh}) }
 
-sub WRITE {
-	use bytes qw(length);
-	syswrite($_[0]->{fh}, $_[1], $_[2] // length($_[1]), $_[3] // 0);
-}
+sub WRITE { syswrite($_[0]->{fh}, $_[1], $_[2] // length($_[1]), $_[3] // 0) }
 
-sub PRINT {
-	my $self = shift;
-	print { $self->{fh} } @_;
-}
+sub PRINT { print { $_[0]->{fh} } @_[1..$#_] }
 
 sub FILENO { fileno($_[0]->{fh}) }
 
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 265638fe..106f5e01 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -376,6 +376,7 @@ sub popen_rd {
 sub popen_wr {
 	my ($cmd, $env, $opt, @cb_arg) = @_;
 	pipe(local $opt->{0}, my $w) or die "pipe: $!\n";
+	$w->autoflush(1);
 	my $pid = spawn($cmd, $env, $opt);
 	PublicInbox::ProcessIO->maybe_new($pid, $w, @cb_arg)
 }

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 2/9] lei_xsearch: improve curl progress reporting
  2023-10-11  7:20 [PATCH 0/9] lei + import-related updates Eric Wong
  2023-10-11  7:20 ` [PATCH 1/9] lei rediff: use ProcessIO for --drq support Eric Wong
@ 2023-10-11  7:20 ` Eric Wong
  2023-10-11  7:20 ` [PATCH 3/9] msgtime: quiet warnings we can do nothing about Eric Wong
                   ` (6 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2023-10-11  7:20 UTC (permalink / raw)
  To: meta

Instead of having tail(1) follow a file when we're in verbose
mode, unconditionally pipe stderr to a Perl 2-liner which tees
its output to a regular file with line buffering.

POSIX tee(1) isn't suitable for this task since it's required
to be completely unbuffered while we want line-buffering when
running parallel processes.  Fortunately, Perl makes this easy.

This also means we no longer leave curl-err.XXXX files around
on premature shutdown if we're hit by a SIGKILL or similar and
can't exit normally.

We do need to stop and respawn the Perl process if we hit a curl
error, though, since we need to be certain the output is
flushed.
---
 lib/PublicInbox/LeiXSearch.pm | 34 +++++++++++++++++++---------------
 1 file changed, 19 insertions(+), 15 deletions(-)

diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index fbafa324..2a4af3e7 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -12,16 +12,15 @@ use PublicInbox::DS qw(now);
 use File::Temp 0.19 (); # 0.19 for ->newdir
 use File::Spec ();
 use PublicInbox::Search qw(xap_terms);
-use PublicInbox::Spawn qw(popen_rd spawn which);
+use PublicInbox::Spawn qw(popen_rd popen_wr which);
 use PublicInbox::MID qw(mids);
 use PublicInbox::Smsg;
-use PublicInbox::AutoReap;
 use PublicInbox::Eml;
 use PublicInbox::LEI;
 use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR);
 use PublicInbox::ContentHash qw(git_sha);
 use POSIX qw(strftime);
-use autodie qw(read seek truncate);
+use autodie qw(open read seek truncate);
 
 sub new {
 	my ($class) = @_;
@@ -330,18 +329,20 @@ sub query_remote_mboxrd {
 	$qstr =~ s/[ \n\t]+/ /sg; # make URLs less ugly
 	my @qform = (x => 'm');
 	push(@qform, t => 1) if $opt->{threads};
-	my $verbose = $opt->{verbose};
-	my $reap_tail;
-	my $cerr = File::Temp->new(TEMPLATE => 'curl.err-XXXX', TMPDIR => 1);
-	fcntl($cerr, F_SETFL, O_APPEND|O_RDWR) or warn "set O_APPEND: $!";
+	open my $cerr, '+>', undef;
 	my $rdr = { 2 => $cerr };
-	if ($verbose) {
-		# spawn a process to force line-buffering, otherwise curl
+	my @lbf_tee;
+	if ($opt->{verbose}) {
+		# spawn a line-buffered tee(1) script, otherwise curl
 		# will write 1 character at-a-time and parallel outputs
 		# mmmaaayyy llloookkk llliiikkkeee ttthhhiiisss
-		my $o = { 1 => $lei->{2}, 2 => $lei->{2} };
-		my $pid = spawn(['tail', '-f', $cerr->filename], undef, $o);
-		$reap_tail = PublicInbox::AutoReap->new($pid);
+		# (n.b. POSIX tee(1) cannot do any buffering)
+		my $o = { 1 => $cerr, 2 => $lei->{2} };
+		delete $rdr->{2};
+		@lbf_tee = ([ $^X, qw(-w -p -e), <<'' ], undef, $o);
+BEGIN { $| = 1; use IO::Handle; STDERR->autoflush(1); }
+print STDERR $_;
+
 	}
 	my $curl = PublicInbox::LeiCurl->new($lei, $self->{curl}) or return;
 	push @$curl, '-s', '-d', '';
@@ -354,6 +355,7 @@ sub query_remote_mboxrd {
 		$uri->query_form(@qform, q => $q);
 		my $cmd = $curl->for_uri($lei, $uri);
 		$lei->qerr("# $cmd");
+		$rdr->{2} //= popen_wr(@lbf_tee) if @lbf_tee;
 		my $cfh = popen_rd($cmd, undef, $rdr);
 		my $fh = IO::Uncompress::Gunzip->new($cfh, MultiStream => 1);
 		PublicInbox::MboxReader->mboxrd($fh, \&each_remote_eml, $self,
@@ -361,17 +363,19 @@ sub query_remote_mboxrd {
 		$lei->sto_done_request if delete($self->{-sto_imported});
 		my $nr = delete $lei->{-nr_remote_eml} // 0;
 		close $cfh;
-		if ($? == 0) { # don't update if no results, maybe MTA is down
+		my $code = $?;
+		if (!$code) { # don't update if no results, maybe MTA is down
 			$lei->{lss}->cfg_set($key, $start) if $key && $nr;
 			mset_progress($lei, $lei->{-current_url}, $nr, $nr);
 			next;
 		}
+		close(delete($rdr->{2})) if @lbf_tee;
 		seek($cerr, 0, SEEK_SET);
 		read($cerr, my $err, -s $cerr);
 		truncate($cerr, 0);
-		next if (($? >> 8) == 22 && $err =~ /\b404\b/);
+		next if (($code >> 8) == 22 && $err =~ /\b404\b/);
 		$uri->query_form(q => $qstr);
-		$lei->child_error($?, "E: <$uri> $err");
+		$lei->child_error($code, "E: <$uri> `$cmd` failed");
 	}
 	undef $each_smsg;
 	$lei->{ovv}->ovv_atexit_child($lei);

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 3/9] msgtime: quiet warnings we can do nothing about
  2023-10-11  7:20 [PATCH 0/9] lei + import-related updates Eric Wong
  2023-10-11  7:20 ` [PATCH 1/9] lei rediff: use ProcessIO for --drq support Eric Wong
  2023-10-11  7:20 ` [PATCH 2/9] lei_xsearch: improve curl progress reporting Eric Wong
@ 2023-10-11  7:20 ` Eric Wong
  2023-10-11  7:20 ` [PATCH 4/9] msgtime: simplify msg_timestamp and msg_datestamp Eric Wong
                   ` (5 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2023-10-11  7:20 UTC (permalink / raw)
  To: meta

In retrospect, warning about bad times and dates is pointless
since there's nothing actionable about it.  We'll also drop an
unnecessary capture in msg_received_at while we're at it and
favor using $eml since as the input variable name to match
current usage.

The note to install Date::Parse as a fallback remains since it
can be helpful in some cases (and is actionable by the user).
---
 lib/PublicInbox/MsgTime.pm | 33 +++++++++++----------------------
 1 file changed, 11 insertions(+), 22 deletions(-)

diff --git a/lib/PublicInbox/MsgTime.pm b/lib/PublicInbox/MsgTime.pm
index 5ee087fd..58b0deae 100644
--- a/lib/PublicInbox/MsgTime.pm
+++ b/lib/PublicInbox/MsgTime.pm
@@ -1,11 +1,11 @@
-# Copyright (C) 2018-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # Various date/time-related functions
 package PublicInbox::MsgTime;
+use v5.10.1; # unicode_strings in 5.12 may not work...
 use strict;
-use warnings;
-use base qw(Exporter);
+use parent qw(Exporter);
 our @EXPORT_OK = qw(msg_timestamp msg_datestamp);
 use Time::Local qw(timegm);
 my @MoY = qw(january february march april may june
@@ -125,10 +125,7 @@ sub str2date_zone ($) {
 	# but we want to keep "git fsck" happy.
 	# "-1200" is the furthest westermost zone offset,
 	# but git fast-import is liberal so we use "-1400"
-	if ($zone >= 1400 || $zone <= -1400) {
-		warn "bogus TZ offset: $zone, ignoring and assuming +0000\n";
-		$zone = '+0000';
-	}
+	$zone = '+0000' if $zone >= 1400 || $zone <= -1400;
 	[$ts, $zone];
 }
 
@@ -138,30 +135,22 @@ sub time_response ($) {
 }
 
 sub msg_received_at ($) {
-	my ($hdr) = @_; # PublicInbox::Eml
-	my @recvd = $hdr->header_raw('Received');
-	my ($ts);
-	foreach my $r (@recvd) {
+	my ($eml) = @_;
+	my $ts;
+	for my $r ($eml->header_raw('Received')) {
 		$r =~ /\s*([0-9]+\s+[a-zA-Z]+\s+[0-9]{2,4}\s+
 			[0-9]+[^0-9][0-9]+(?:[^0-9][0-9]+)
-			\s+([\+\-][0-9]+))/sx or next;
+			\s+(?:[\+\-][0-9]+))/sx or next;
 		$ts = eval { str2date_zone($1) } and return $ts;
-		my $mid = $hdr->header_raw('Message-ID');
-		warn "no date in $mid Received: $r\n";
 	}
 	undef;
 }
 
 sub msg_date_only ($) {
-	my ($hdr) = @_; # PublicInbox::Eml
-	my @date = $hdr->header_raw('Date');
-	my ($ts);
-	foreach my $d (@date) {
+	my ($eml) = @_;
+	my $ts;
+	for my $d ($eml->header_raw('Date')) {
 		$ts = eval { str2date_zone($d) } and return $ts;
-		if ($@) {
-			my $mid = $hdr->header_raw('Message-ID');
-			warn "bad Date: $d in $mid: $@\n";
-		}
 	}
 	undef;
 }

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 4/9] msgtime: simplify msg_timestamp and msg_datestamp
  2023-10-11  7:20 [PATCH 0/9] lei + import-related updates Eric Wong
                   ` (2 preceding siblings ...)
  2023-10-11  7:20 ` [PATCH 3/9] msgtime: quiet warnings we can do nothing about Eric Wong
@ 2023-10-11  7:20 ` Eric Wong
  2023-10-11  7:20 ` [PATCH 5/9] treewide: consolidate "From " line removal Eric Wong
                   ` (4 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2023-10-11  7:20 UTC (permalink / raw)
  To: meta

We don't need multiple return points nor multiple time_response
calls in either function.
---
 lib/PublicInbox/MsgTime.pm | 16 ++++++----------
 1 file changed, 6 insertions(+), 10 deletions(-)

diff --git a/lib/PublicInbox/MsgTime.pm b/lib/PublicInbox/MsgTime.pm
index 58b0deae..bbc9a007 100644
--- a/lib/PublicInbox/MsgTime.pm
+++ b/lib/PublicInbox/MsgTime.pm
@@ -157,20 +157,16 @@ sub msg_date_only ($) {
 
 # Favors Received header for sorting globally
 sub msg_timestamp ($;$) {
-	my ($hdr, $fallback) = @_; # PublicInbox::Eml
-	my $ret;
-	$ret = msg_received_at($hdr) and return time_response($ret);
-	$ret = msg_date_only($hdr) and return time_response($ret);
-	time_response([ $fallback // time, '+0000' ]);
+	my ($eml, $fallback) = @_;
+	time_response(msg_received_at($eml) // msg_date_only($eml) //
+			[ $fallback // time, '+0000' ]);
 }
 
 # Favors the Date: header for display and sorting within a thread
 sub msg_datestamp ($;$) {
-	my ($hdr, $fallback) = @_; # PublicInbox::Eml
-	my $ret;
-	$ret = msg_date_only($hdr) and return time_response($ret);
-	$ret = msg_received_at($hdr) and return time_response($ret);
-	time_response([ $fallback // time, '+0000' ]);
+	my ($eml, $fallback) = @_; # PublicInbox::Eml
+	time_response(msg_date_only($eml) // msg_received_at($eml) //
+			[ $fallback // time, '+0000' ]);
 }
 
 1;

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 5/9] treewide: consolidate "From " line removal
  2023-10-11  7:20 [PATCH 0/9] lei + import-related updates Eric Wong
                   ` (3 preceding siblings ...)
  2023-10-11  7:20 ` [PATCH 4/9] msgtime: simplify msg_timestamp and msg_datestamp Eric Wong
@ 2023-10-11  7:20 ` Eric Wong
  2023-10-11  7:20 ` [PATCH 6/9] import: switch to Unix stream socket for fast-import Eric Wong
                   ` (3 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2023-10-11  7:20 UTC (permalink / raw)
  To: meta

Aside from our prior import bugs (fixed in a0c07cba0e5d8b6a
(mda: drop leading "From " lines again, 2016-06-26)), we'll
always have to be dealing with mutt piping messages to us and
`git format-patch' output.  So just share the regexp so we
can use it everywhere.

In may be desirable to allow importing messages with a leading
"From " line for FUSE, even.

Additionally, some instances of this regexp needlessly added
optional `\r?' (CR) checks ahead of the `\n' (LF) element; but
they're pointless anyways since [^\n]* is enough to exclude all
non-LF bytes.
---
 lib/PublicInbox/Eml.pm        |  6 ++++++
 lib/PublicInbox/IMAP.pm       |  2 +-
 lib/PublicInbox/Import.pm     |  8 +++-----
 lib/PublicInbox/LeiInput.pm   |  5 +----
 lib/PublicInbox/LeiInspect.pm |  2 +-
 lib/PublicInbox/LeiToMail.pm  |  3 +--
 lib/PublicInbox/Mbox.pm       | 16 +++++++---------
 lib/PublicInbox/MboxReader.pm |  2 +-
 lib/PublicInbox/NNTP.pm       |  3 +--
 script/public-inbox-edit      |  5 ++---
 script/public-inbox-learn     |  2 +-
 script/public-inbox-mda       |  4 ++--
 script/public-inbox-purge     |  4 ++--
 13 files changed, 29 insertions(+), 33 deletions(-)

diff --git a/lib/PublicInbox/Eml.pm b/lib/PublicInbox/Eml.pm
index 8b999e1a..24060ec8 100644
--- a/lib/PublicInbox/Eml.pm
+++ b/lib/PublicInbox/Eml.pm
@@ -528,4 +528,10 @@ sub willneed { re_memo($_) for @_ }
 willneed(qw(From To Cc Date Subject Content-Type In-Reply-To References
 		Message-ID X-Alt-Message-ID));
 
+# This fixes an old bug from import (pre-a0c07cba0e5d8b6a)
+# mutt also pipes single RFC822 messages with a "From " line,
+# but no Content-Length or "From " escaping.
+# "git format-patch" also generates such files by default.
+sub strip_from { $_[0] =~ s/\A[\r\n]*From [^\n]*\n//s }
+
 1;
diff --git a/lib/PublicInbox/IMAP.pm b/lib/PublicInbox/IMAP.pm
index 3c64cefa..e4a9e304 100644
--- a/lib/PublicInbox/IMAP.pm
+++ b/lib/PublicInbox/IMAP.pm
@@ -664,7 +664,7 @@ sub op_eml_new { $_[4] = PublicInbox::Eml->new($_[3]) }
 # s/From / fixes old bug from import (pre-a0c07cba0e5d8b6a)
 sub to_crlf_full {
 	${$_[0]} =~ s/(?<!\r)\n/\r\n/sg;
-	${$_[0]} =~ s/\A[\r\n]*From [^\r\n]*\r\n//s;
+	PublicInbox::Eml::strip_from(${$_[0]});
 }
 
 sub op_crlf_bref { to_crlf_full($_[3]) }
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 7175884c..cd03da05 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -118,9 +118,6 @@ sub _cat_blob ($$$) {
 	$n == $len or croak "cat-blob: short read: $n < $len";
 	my $lf = chop $buf;
 	croak "bad read on final byte: <$lf>" if $lf ne "\n";
-
-	# fixup some bugginess in old versions:
-	$buf =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
 	\$buf;
 }
 
@@ -136,8 +133,9 @@ sub check_remove_v1 {
 	my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef);
 	$info =~ m!\A100644 blob ([a-f0-9]{40,})\t!s or die "not blob: $info";
 	my $oid = $1;
-	my $msg = _cat_blob($r, $w, $oid) or die "BUG: cat-blob $1 failed";
-	my $cur = PublicInbox::Eml->new($msg);
+	my $bref = _cat_blob($r, $w, $oid) or die "BUG: cat-blob $1 failed";
+	PublicInbox::Eml::strip_from($$bref);
+	my $cur = PublicInbox::Eml->new($bref);
 	my $cur_s = $cur->header('Subject') // '';
 	my $cur_m = $mime->header('Subject') // '';
 	if ($cur_s ne $cur_m || norm_body($cur) ne norm_body($mime)) {
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index 93f8b6b8..28b73ca9 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -84,10 +84,7 @@ sub input_fh {
 			return $self->{lei}->child_error(0, <<"");
 error reading $name: $!
 
-		# mutt pipes single RFC822 messages with a "From " line,
-		# but no Content-Length or "From " escaping.
-		# "git format-patch" also generates such files by default.
-		$buf =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+		PublicInbox::Eml::strip_from($buf);
 
 		# a user may feed just a body: git diff | lei rediff -U9
 		if ($self->{-force_eml}) {
diff --git a/lib/PublicInbox/LeiInspect.pm b/lib/PublicInbox/LeiInspect.pm
index f801610f..65c64cf2 100644
--- a/lib/PublicInbox/LeiInspect.pm
+++ b/lib/PublicInbox/LeiInspect.pm
@@ -254,7 +254,7 @@ sub inspect_start ($$) {
 sub do_inspect { # lei->do_env cb
 	my ($lei) = @_;
 	my $str = delete $lei->{istr};
-	$str =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+	PublicInbox::Eml::strip_from($str);
 	my $eml = PublicInbox::Eml->new(\$str);
 	inspect_start($lei, [ 'blob:'.$lei->git_oid($eml)->hexdigest,
 			map { "mid:$_" } @{mids($eml)} ]);
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 8771592d..ead60b38 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -53,8 +53,7 @@ sub _mbox_hdr_buf ($$$) {
 	}
 	my $buf = delete $eml->{hdr};
 
-	# fixup old bug from import (pre-a0c07cba0e5d8b6a)
-	$$buf =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+	PublicInbox::Eml::strip_from($$buf);
 	my $ident = $smsg->{blob} // 'lei';
 	if (defined(my $pct = $smsg->{pct})) { $ident .= "=$pct" }
 
diff --git a/lib/PublicInbox/Mbox.pm b/lib/PublicInbox/Mbox.pm
index bf61bb0e..52f88ae3 100644
--- a/lib/PublicInbox/Mbox.pm
+++ b/lib/PublicInbox/Mbox.pm
@@ -89,17 +89,15 @@ sub emit_raw {
 
 sub msg_hdr ($$) {
 	my ($ctx, $eml) = @_;
-	my $header_obj = $eml->header_obj;
 
-	# drop potentially confusing headers, ssoma already should've dropped
-	# Lines and Content-Length
-	foreach my $d (qw(Lines Bytes Content-Length Status)) {
-		$header_obj->header_set($d);
+	# drop potentially confusing headers, various importers should've
+	# already dropped these, but we can't trust stuff we've cloned
+	for my $d (qw(Lines Bytes Content-Length Status)) {
+		$eml->header_set($d);
 	}
-	my $crlf = $header_obj->crlf;
-	my $buf = $header_obj->as_string;
-	# fixup old bug from import (pre-a0c07cba0e5d8b6a)
-	$buf =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+	my $crlf = $eml->crlf;
+	my $buf = $eml->header_obj->as_string;
+	PublicInbox::Eml::strip_from($buf);
 	"From mboxrd\@z Thu Jan  1 00:00:00 1970" . $crlf . $buf . $crlf;
 }
 
diff --git a/lib/PublicInbox/MboxReader.pm b/lib/PublicInbox/MboxReader.pm
index e4209022..d67fb4eb 100644
--- a/lib/PublicInbox/MboxReader.pm
+++ b/lib/PublicInbox/MboxReader.pm
@@ -93,7 +93,7 @@ sub _mbox_cl ($$$;@) {
 			undef $mbfh;
 		}
 		while (my $hdr = _extract_hdr(\$buf)) {
-			$$hdr =~ s/\A[\r\n]*From [^\n]*\n//s or
+			PublicInbox::Eml::strip_from($$hdr) or
 				die "E: no 'From ' line in:\n", Dumper($hdr);
 			my $eml = PublicInbox::Eml->new($hdr);
 			next unless $eml->raw_size;
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 316b7775..603cf094 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -523,8 +523,7 @@ sub msg_hdr_write ($$) {
 	set_nntp_headers($eml, $smsg);
 
 	my $hdr = $eml->{hdr} // \(my $x = '');
-	# fixup old bug from import (pre-a0c07cba0e5d8b6a)
-	$$hdr =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+	PublicInbox::Eml::strip_from($$hdr);
 	$$hdr =~ s/(?<!\r)\n/\r\n/sg; # Alpine barfs without this
 
 	# for leafnode compatibility, we need to ensure Message-ID headers
diff --git a/script/public-inbox-edit b/script/public-inbox-edit
index 1fbaf5a7..1fb6f32b 100755
--- a/script/public-inbox-edit
+++ b/script/public-inbox-edit
@@ -1,5 +1,5 @@
 #!/usr/bin/perl -w
-# Copyright (C) 2019-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 #
 # Used for editing messages in a public-inbox.
@@ -188,8 +188,7 @@ retry_edit:
 		"read $edit_fn: $!\n";
 
 	if (!$opt->{raw}) {
-		# get rid of the From we added
-		$new_raw =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+		PublicInbox::Eml::strip_from($new_raw);
 
 		# check if user forgot to purge (in mutt) after editing
 		if ($new_raw =~ /^From /sm) {
diff --git a/script/public-inbox-learn b/script/public-inbox-learn
index 8b8e1b77..6e1978a7 100755
--- a/script/public-inbox-learn
+++ b/script/public-inbox-learn
@@ -40,7 +40,7 @@ my $pi_cfg = PublicInbox::Config->new;
 my $err;
 my $mime = PublicInbox::Eml->new(do{
 	defined(my $data = do { local $/; <STDIN> }) or die "read STDIN: $!\n";
-	$data =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+	PublicInbox::Eml::strip_from($data);
 
 	if ($train ne 'rm') {
 		eval {
diff --git a/script/public-inbox-mda b/script/public-inbox-mda
index ba498956..cac819ac 100755
--- a/script/public-inbox-mda
+++ b/script/public-inbox-mda
@@ -1,5 +1,5 @@
 #!/usr/bin/perl -w
-# Copyright (C) 2013-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 #
 # Mail delivery agent for public-inbox, run from your MTA upon mail delivery
@@ -39,7 +39,7 @@ use PublicInbox::Spamcheck;
 my $emergency = $ENV{PI_EMERGENCY} || "$ENV{HOME}/.public-inbox/emergency/";
 $ems = PublicInbox::Emergency->new($emergency);
 my $str = do { local $/; <STDIN> };
-$str =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+PublicInbox::Eml::strip_from($str);
 $ems->prepare(\$str);
 my $eml = PublicInbox::Eml->new(\$str);
 my $cfg = PublicInbox::Config->new;
diff --git a/script/public-inbox-purge b/script/public-inbox-purge
index 121027cc..8f9b0b16 100755
--- a/script/public-inbox-purge
+++ b/script/public-inbox-purge
@@ -1,5 +1,5 @@
 #!/usr/bin/perl -w
-# Copyright (C) 2019-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 #
 # Used for purging messages entirely from a public-inbox.  Currently
@@ -34,7 +34,7 @@ my @ibxs = PublicInbox::Admin::resolve_inboxes(\@ARGV, $opt);
 PublicInbox::AdminEdit::check_editable(\@ibxs);
 
 defined(my $data = do { local $/; <STDIN> }) or die "read STDIN: $!\n";
-$data =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+PublicInbox::Eml::strip_from($data);
 my $n_purged = 0;
 
 foreach my $ibx (@ibxs) {

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 6/9] import: switch to Unix stream socket for fast-import
  2023-10-11  7:20 [PATCH 0/9] lei + import-related updates Eric Wong
                   ` (4 preceding siblings ...)
  2023-10-11  7:20 ` [PATCH 5/9] treewide: consolidate "From " line removal Eric Wong
@ 2023-10-11  7:20 ` Eric Wong
  2023-10-11  7:20 ` [PATCH 7/9] import: cat_blob is a no-op w/o live fast-import Eric Wong
                   ` (2 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2023-10-11  7:20 UTC (permalink / raw)
  To: meta

We use fewer file descriptors and fewer lines of code this way.
I'm not aware of any place we rely on POSIX pipe semantics with
`git fast-import', and sockets have bigger buffers by default
in most cases (even if Linux allows larger pipe buffers).
---
 lib/PublicInbox/Import.pm   | 132 +++++++++++++++++-------------------
 script/public-inbox-convert |  18 ++---
 2 files changed, 71 insertions(+), 79 deletions(-)

diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index cd03da05..894ba818 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -8,7 +8,7 @@
 package PublicInbox::Import;
 use v5.12;
 use parent qw(PublicInbox::Lock);
-use PublicInbox::Spawn qw(run_die popen_rd);
+use PublicInbox::Spawn qw(run_die popen_rd spawn);
 use PublicInbox::MID qw(mids mid2path);
 use PublicInbox::Address;
 use PublicInbox::Smsg;
@@ -16,9 +16,11 @@ use PublicInbox::MsgTime qw(msg_datestamp);
 use PublicInbox::ContentHash qw(content_digest);
 use PublicInbox::MDA;
 use PublicInbox::Eml;
+use PublicInbox::ProcessIO;
 use POSIX qw(strftime);
-use autodie qw(read close);
+use autodie qw(read close socketpair);
 use Carp qw(croak);
+use Socket qw(AF_UNIX SOCK_STREAM);
 
 sub default_branch () {
 	state $default_branch = do {
@@ -56,11 +58,10 @@ sub new {
 # idempotent start function
 sub gfi_start {
 	my ($self) = @_;
-
-	return ($self->{in}, $self->{out}) if $self->{in};
-
-	my ($in_r, $out_r, $out_w);
-	pipe($out_r, $out_w) or die "pipe failed: $!";
+	my $io = $self->{io};
+	return $io if $io;
+	socketpair($io, my $s2, AF_UNIX, SOCK_STREAM, 0);
+	$io->autoflush(1);
 
 	$self->lock_acquire;
 	eval {
@@ -73,18 +74,17 @@ sub gfi_start {
 			die "fatal: ls-tree -r -z --name-only $ref: \$?=$?" if $?;
 			$self->{-tree} = { map { $_ => 1 } split(/\0/, $t) };
 		}
-		$in_r = $self->{in} = $git->popen(qw(fast-import
-					--quiet --done --date-format=raw),
-					undef, { 0 => $out_r });
-		$out_w->autoflush(1);
-		$self->{out} = $out_w;
+		my $gfi = [ 'git', "--git-dir=$git->{git_dir}", qw(fast-import
+				--quiet --done --date-format=raw) ];
+		my $pid = spawn($gfi, undef, { 0 => $s2, 1 => $s2 });
 		$self->{nchg} = 0;
+		$self->{io} = PublicInbox::ProcessIO->maybe_new($pid, $io);
 	};
 	if ($@) {
 		$self->lock_release;
 		die $@;
 	}
-	($in_r, $out_w);
+	$self->{io};
 }
 
 sub wfail () { die "write to fast-import failed: $!" }
@@ -99,22 +99,22 @@ sub norm_body ($) {
 }
 
 # only used for v1 (ssoma) inboxes
-sub _check_path ($$$$) {
-	my ($r, $w, $tip, $path) = @_;
+sub _check_path ($$$) {
+	my ($io, $tip, $path) = @_;
 	return if $tip eq '';
-	print $w "ls $tip $path\n" or wfail;
+	print $io "ls $tip $path\n" or wfail;
 	local $/ = "\n";
-	my $info = <$r> // die "EOF from fast-import: $!";
+	my $info = <$io> // die "EOF from fast-import: $!";
 	$info =~ /\Amissing / ? undef : $info;
 }
 
-sub _cat_blob ($$$) {
-	my ($r, $w, $oid) = @_;
-	print $w "cat-blob $oid\n" or wfail;
+sub _cat_blob ($$) {
+	my ($io, $oid) = @_;
+	print $io "cat-blob $oid\n" or wfail;
 	local $/ = "\n";
-	my $info = <$r> // die "EOF from fast-import / cat-blob: $!";
+	my $info = <$io> // die "EOF from fast-import / cat-blob: $!";
 	$info =~ /\A[a-f0-9]{40,} blob ([0-9]+)\n\z/ or return;
-	my $n = read($r, my $buf, my $len = $1 + 1);
+	my $n = read($io, my $buf, my $len = $1 + 1);
 	$n == $len or croak "cat-blob: short read: $n < $len";
 	my $lf = chop $buf;
 	croak "bad read on final byte: <$lf>" if $lf ne "\n";
@@ -123,17 +123,16 @@ sub _cat_blob ($$$) {
 
 sub cat_blob {
 	my ($self, $oid) = @_;
-	my ($r, $w) = $self->gfi_start;
-	_cat_blob($r, $w, $oid);
+	_cat_blob(gfi_start($self), $oid);
 }
 
 sub check_remove_v1 {
-	my ($r, $w, $tip, $path, $mime) = @_;
+	my ($io, $tip, $path, $mime) = @_;
 
-	my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef);
+	my $info = _check_path($io, $tip, $path) or return ('MISSING',undef);
 	$info =~ m!\A100644 blob ([a-f0-9]{40,})\t!s or die "not blob: $info";
 	my $oid = $1;
-	my $bref = _cat_blob($r, $w, $oid) or die "BUG: cat-blob $1 failed";
+	my $bref = _cat_blob($io, $oid) or die "BUG: cat-blob $1 failed";
 	PublicInbox::Eml::strip_from($$bref);
 	my $cur = PublicInbox::Eml->new($bref);
 	my $cur_s = $cur->header('Subject') // '';
@@ -146,16 +145,15 @@ sub check_remove_v1 {
 
 sub checkpoint {
 	my ($self) = @_;
-	return unless $self->{in};
-	print { $self->{out} } "checkpoint\n" or wfail;
+	print { $self->{io} // return } "checkpoint\n" or wfail;
 	undef;
 }
 
 sub progress {
 	my ($self, $msg) = @_;
-	return unless $self->{in};
-	print { $self->{out} } "progress $msg\n" or wfail;
-	readline($self->{in}) eq "progress $msg\n" or die
+	my $io = $self->{io} or return;
+	print $io "progress $msg\n" or wfail;
+	readline($io) eq "progress $msg\n" or die
 		"progress $msg not received\n";
 	undef;
 }
@@ -205,10 +203,9 @@ sub barrier {
 # used for v2
 sub get_mark {
 	my ($self, $mark) = @_;
-	die "not active\n" unless $self->{in};
-	my ($r, $w) = $self->gfi_start;
-	print $w "get-mark $mark\n" or wfail;
-	my $oid = <$r> // die "get-mark failed, need git 2.6.0+\n";
+	my $io = $self->{io} or croak "not active\n";
+	print $io "get-mark $mark\n" or wfail;
+	my $oid = <$io> // die "get-mark failed, need git 2.6.0+\n";
 	chomp($oid);
 	$oid;
 }
@@ -225,11 +222,11 @@ sub remove {
 	my $path_type = $self->{path_type};
 	my ($path, $err, $cur, $blob);
 
-	my ($r, $w) = $self->gfi_start;
+	my $io = gfi_start($self);
 	my $tip = $self->{tip};
 	if ($path_type eq '2/38') {
 		$path = mid2path(v1_mid0($mime));
-		($err, $cur) = check_remove_v1($r, $w, $tip, $path, $mime);
+		($err, $cur) = check_remove_v1($io, $tip, $path, $mime);
 		return ($err, $cur) if $err;
 	} else {
 		my $sref;
@@ -241,7 +238,7 @@ sub remove {
 		}
 		my $len = length($$sref);
 		$blob = $self->{mark}++;
-		print $w "blob\nmark :$blob\ndata $len\n",
+		print $io "blob\nmark :$blob\ndata $len\n",
 			$$sref, "\n" or wfail;
 	}
 
@@ -249,22 +246,22 @@ sub remove {
 	my $commit = $self->{mark}++;
 	my $parent = $tip =~ /\A:/ ? $tip : undef;
 	unless ($parent) {
-		print $w "reset $ref\n" or wfail;
+		print $io "reset $ref\n" or wfail;
 	}
 	my $ident = $self->{ident};
 	my $now = now_raw();
 	$msg //= 'rm';
 	my $len = length($msg) + 1;
-	print $w "commit $ref\nmark :$commit\n",
+	print $io "commit $ref\nmark :$commit\n",
 		"author $ident $now\n",
 		"committer $ident $now\n",
 		"data $len\n$msg\n\n",
 		'from ', ($parent ? $parent : $tip), "\n" or wfail;
 	if (defined $path) {
-		print $w "D $path\n\n" or wfail;
+		print $io "D $path\n\n" or wfail;
 	} else {
-		clean_tree_v2($self, $w, 'd');
-		print $w "M 100644 :$blob d\n\n" or wfail;
+		clean_tree_v2($self, $io, 'd');
+		print $io "M 100644 :$blob d\n\n" or wfail;
 	}
 	$self->{nchg}++;
 	(($self->{tip} = ":$commit"), $cur);
@@ -354,11 +351,11 @@ sub v1_mid0 ($) {
 	$mids->[0];
 }
 sub clean_tree_v2 ($$$) {
-	my ($self, $w, $keep) = @_;
+	my ($self, $io, $keep) = @_;
 	my $tree = $self->{-tree} or return; #v2 only
 	delete $tree->{$keep};
 	foreach (keys %$tree) {
-		print $w "D $_\n" or wfail;
+		print $io "D $_\n" or wfail;
 	}
 	%$tree = ($keep => 1);
 }
@@ -377,10 +374,10 @@ sub add {
 		$path = 'm';
 	}
 
-	my ($r, $w) = $self->gfi_start;
+	my $io = gfi_start($self);
 	my $tip = $self->{tip};
 	if ($path_type eq '2/38') {
-		_check_path($r, $w, $tip, $path) and return;
+		_check_path($io, $tip, $path) and return;
 	}
 
 	drop_unwanted_headers($mime);
@@ -394,8 +391,7 @@ sub add {
 	my $raw_email = $mime->{-public_inbox_raw} // $mime->as_string;
 	my $n = length($raw_email);
 	$self->{bytes_added} += $n;
-	print $w "blob\nmark :$blob\ndata ", $n, "\n" or wfail;
-	print $w $raw_email, "\n" or wfail;
+	print $io "blob\nmark :$blob\ndata $n\n", $raw_email, "\n" or wfail;
 
 	# v2: we need this for Xapian
 	if ($smsg) {
@@ -422,19 +418,19 @@ sub add {
 	my $parent = $tip =~ /\A:/ ? $tip : undef;
 
 	unless ($parent) {
-		print $w "reset $ref\n" or wfail;
+		print $io "reset $ref\n" or wfail;
 	}
 
-	print $w "commit $ref\nmark :$commit\n",
+	print $io "commit $ref\nmark :$commit\n",
 		"author $author $at\n",
-		"committer $self->{ident} $ct\n" or wfail;
-	print $w "data ", (length($subject) + 1), "\n",
+		"committer $self->{ident} $ct\n",
+		"data ", (length($subject) + 1), "\n",
 		$subject, "\n\n" or wfail;
 	if ($tip ne '') {
-		print $w 'from ', ($parent ? $parent : $tip), "\n" or wfail;
+		print $io 'from ', ($parent ? $parent : $tip), "\n" or wfail;
 	}
-	clean_tree_v2($self, $w, $path);
-	print $w "M 100644 :$blob $path\n\n" or wfail;
+	clean_tree_v2($self, $io, $path);
+	print $io "M 100644 :$blob $path\n\n" or wfail;
 	$self->{nchg}++;
 	$self->{tip} = ":$commit";
 }
@@ -475,15 +471,14 @@ EOM
 }
 
 # true if locked and active
-sub active { !!$_[0]->{out} }
+sub active { !!$_[0]->{io} }
 
 sub done {
 	my ($self) = @_;
-	my $w = delete $self->{out} or return;
+	my $io = delete $self->{io} or return;
 	eval {
-		my $r = delete $self->{in} or die 'BUG: missing {in} when done';
-		print $w "done\n" or wfail;
-		close $r;
+		print $io "done\n" or wfail;
+		close $io; # reaps and dies on error
 	};
 	my $wait_err = $@;
 	my $nchg = delete $self->{nchg};
@@ -496,10 +491,7 @@ sub done {
 	die $wait_err if $wait_err;
 }
 
-sub atfork_child {
-	my ($self) = @_;
-	close($_) for (grep defined, delete(@$self{qw(in out)}));
-}
+sub atfork_child { close(delete($_[0]->{io}) // return) }
 
 sub digest2mid ($$;$) {
 	my ($dig, $hdr, $fallback_time) = @_;
@@ -552,7 +544,7 @@ sub replace_oids {
 	my $git = $self->{git};
 	my @export = (qw(fast-export --no-data --use-done-feature), $old);
 	my $rd = $git->popen(@export);
-	my ($r, $w) = $self->gfi_start;
+	my $io = gfi_start($self);
 	my @buf;
 	my $nreplace = 0;
 	my @oids;
@@ -563,7 +555,7 @@ sub replace_oids {
 			push @buf, "reset $tmp\n";
 		} elsif (/^commit (?:.+)/) {
 			if (@buf) {
-				print $w @buf or wfail;
+				print $io @buf or wfail;
 				@buf = ();
 			}
 			push @buf, "commit $tmp\n";
@@ -599,7 +591,7 @@ sub replace_oids {
 				rewrite_commit($self, \@oids, \@buf, $mime);
 				$nreplace++;
 			}
-			print $w @buf, "\n" or wfail;
+			print $io @buf, "\n" or wfail;
 			@buf = ();
 		} elsif ($_ eq "done\n") {
 			$done = 1;
@@ -612,7 +604,7 @@ sub replace_oids {
 	}
 	close $rd;
 	if (@buf) {
-		print $w @buf or wfail;
+		print $io @buf or wfail;
 	}
 	die 'done\n not seen from fast-export' unless $done;
 	chomp(my $cmt = $self->get_mark(":$mark")) if $nreplace;
diff --git a/script/public-inbox-convert b/script/public-inbox-convert
index 780f7194..0cc52777 100755
--- a/script/public-inbox-convert
+++ b/script/public-inbox-convert
@@ -120,7 +120,7 @@ my $head = $old->{ref_head} || 'HEAD';
 my $rd = $old->git->popen(qw(fast-export --use-done-feature), $head);
 $v2w->idx_init($opt);
 my $im = $v2w->importer;
-my ($r, $w) = $im->gfi_start;
+my $io = $im->gfi_start;
 my $h = '[0-9a-f]';
 my %D;
 my $last;
@@ -131,12 +131,12 @@ while (<$rd>) {
 		$state = 'commit';
 	} elsif (/^data ([0-9]+)/) {
 		my $len = $1;
-		print $w $_ or $im->wfail;
+		print $io $_ or $im->wfail;
 		while ($len) {
 			my $n = read($rd, my $tmp, $len) or die "read: $!";
 			warn "$n != $len\n" if $n != $len;
 			$len -= $n;
-			print $w $tmp or $im->wfail;
+			print $io $tmp or $im->wfail;
 		}
 		next;
 	} elsif ($state eq 'commit') {
@@ -144,9 +144,9 @@ while (<$rd>) {
 			my ($mark, $path) = ($1, $2);
 			$D{$path} = $mark;
 			if ($last && $last ne 'm') {
-				print $w "D $last\n" or $im->wfail;
+				print $io "D $last\n" or $im->wfail;
 			}
-			print $w "M 100644 :$mark m\n" or $im->wfail;
+			print $io "M 100644 :$mark m\n" or $im->wfail;
 			$last = 'm';
 			next;
 		}
@@ -154,18 +154,18 @@ while (<$rd>) {
 			my $mark = delete $D{$1};
 			defined $mark or die "undeleted path: $1\n";
 			if ($last && $last ne 'd') {
-				print $w "D $last\n" or $im->wfail;
+				print $io "D $last\n" or $im->wfail;
 			}
-			print $w "M 100644 :$mark d\n" or $im->wfail;
+			print $io "M 100644 :$mark d\n" or $im->wfail;
 			$last = 'd';
 			next;
 		}
 	}
 	last if $_ eq "done\n";
-	print $w $_ or $im->wfail;
+	print $io $_ or $im->wfail;
 }
 close $rd or die "fast-export: \$?=$? \$!=$!\n";
-$r = $w = undef; # v2w->done does the actual close and error checking
+$io = undef;
 $v2w->done;
 if (my $old_mm = $old->mm) {
 	$old->cleanup;

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 7/9] import: cat_blob is a no-op w/o live fast-import
  2023-10-11  7:20 [PATCH 0/9] lei + import-related updates Eric Wong
                   ` (5 preceding siblings ...)
  2023-10-11  7:20 ` [PATCH 6/9] import: switch to Unix stream socket for fast-import Eric Wong
@ 2023-10-11  7:20 ` Eric Wong
  2023-10-11  7:20 ` [PATCH 8/9] lei blob: run cat_blob on lei/store for pending blobs Eric Wong
  2023-10-11  7:20 ` [PATCH 9/9] lei import|tag|rm: support --commit-delay=SECONDS Eric Wong
  8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2023-10-11  7:20 UTC (permalink / raw)
  To: meta

cat_blob is a fallback for handling files which haven't made it
onto disk to be readable by `git cat-file'.  Thus spawning a new
fast-import process to retrieve a blob is pointless, as cat_blob
is only used as a last resort when `git cat-file' fails.
---
 lib/PublicInbox/Import.pm | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 894ba818..6bb2c66d 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -123,7 +123,7 @@ sub _cat_blob ($$) {
 
 sub cat_blob {
 	my ($self, $oid) = @_;
-	_cat_blob(gfi_start($self), $oid);
+	_cat_blob($self->{io} // return, $oid);
 }
 
 sub check_remove_v1 {

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 8/9] lei blob: run cat_blob on lei/store for pending blobs
  2023-10-11  7:20 [PATCH 0/9] lei + import-related updates Eric Wong
                   ` (6 preceding siblings ...)
  2023-10-11  7:20 ` [PATCH 7/9] import: cat_blob is a no-op w/o live fast-import Eric Wong
@ 2023-10-11  7:20 ` Eric Wong
  2023-10-11  7:20 ` [PATCH 9/9] lei import|tag|rm: support --commit-delay=SECONDS Eric Wong
  8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2023-10-11  7:20 UTC (permalink / raw)
  To: meta

This can probably be made asynchronous in the future via
PublicInbox::InputPipe, but it's good enough for testing.
---
 lib/PublicInbox/LeiBlob.pm  | 16 ++++++++++------
 lib/PublicInbox/LeiStore.pm |  5 +++++
 2 files changed, 15 insertions(+), 6 deletions(-)

diff --git a/lib/PublicInbox/LeiBlob.pm b/lib/PublicInbox/LeiBlob.pm
index 8df83b1d..d069d4a8 100644
--- a/lib/PublicInbox/LeiBlob.pm
+++ b/lib/PublicInbox/LeiBlob.pm
@@ -9,6 +9,7 @@ use v5.10.1;
 use parent qw(PublicInbox::IPC);
 use PublicInbox::Spawn qw(run_wait popen_rd which);
 use PublicInbox::DS;
+use PublicInbox::Eml;
 
 sub get_git_dir ($$) {
 	my ($lei, $d) = @_;
@@ -121,18 +122,21 @@ sub lei_blob {
 				'cat-file', 'blob', $blob ];
 		if (defined $lei->{-attach_idx}) {
 			my $fh = popen_rd($cmd, $lei->{env}, $rdr);
-			require PublicInbox::Eml;
 			my $buf = do { local $/; <$fh> };
 			return extract_attach($lei, $blob, \$buf) if close($fh);
 		}
 		$rdr->{1} = $lei->{1};
 		my $cerr = run_wait($cmd, $lei->{env}, $rdr) or return;
 		my $lms = $lei->lms;
-		if (my $bref = $lms ? $lms->local_blob($blob, 1) : undef) {
-			defined($lei->{-attach_idx}) and
-				return extract_attach($lei, $blob, $bref);
-			return $lei->out($$bref);
-		} elsif ($opt->{mail}) {
+		my $bref = ($lms ? $lms->local_blob($blob, 1) : undef) // do {
+			my $sto = $lei->{sto} // $lei->_lei_store;
+			$sto && $sto->{-wq_s1} ? $sto->wq_do('cat_blob', $blob)
+						: undef;
+		};
+		$bref and return $lei->{-attach_idx} ?
+					extract_attach($lei, $blob, $bref) :
+					$lei->out($$bref);
+		if ($opt->{mail}) {
 			my $eh = $rdr->{2};
 			seek($eh, 0, 0);
 			return $lei->child_error($cerr, do { local $/; <$eh> });
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index e19ec88e..9c07af14 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -108,6 +108,11 @@ sub search {
 	PublicInbox::LeiSearch->new($_[0]->{priv_eidx}->{topdir});
 }
 
+sub cat_blob {
+	my ($self, $oid) = @_;
+	$self->{im} ? $self->{im}->cat_blob($oid) : undef;
+}
+
 # follows the stderr file
 sub _tail_err {
 	my ($self) = @_;

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 9/9] lei import|tag|rm: support --commit-delay=SECONDS
  2023-10-11  7:20 [PATCH 0/9] lei + import-related updates Eric Wong
                   ` (7 preceding siblings ...)
  2023-10-11  7:20 ` [PATCH 8/9] lei blob: run cat_blob on lei/store for pending blobs Eric Wong
@ 2023-10-11  7:20 ` Eric Wong
  8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2023-10-11  7:20 UTC (permalink / raw)
  To: meta

Delayed commits  allows users to trade off immediate safety for
throughput and reduced storage wear when running multiple
discreet commands.

This feature is currently useful for providing a way to make
t/lei-store-fail.t reliable and for ensuring `lei blob' can
retrieve messages which have not yet been committed.

In the future, it'll also be useful for the FUSE layer to batch
git activity.
---
 lib/PublicInbox/LEI.pm      | 23 ++++++++++++++---------
 lib/PublicInbox/LeiStore.pm |  6 ++++++
 t/lei-import.t              | 13 +++++++++++++
 t/lei-store-fail.t          | 20 +++++++++++++-------
 t/lei-tag.t                 | 15 ++++++++++++++-
 5 files changed, 60 insertions(+), 17 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index e2b3c0d9..af39f8af 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -231,13 +231,13 @@ our %CMD = ( # sorted in order of importance/use:
 'rm' => [ '--stdin|LOCATION...',
 	'remove a message from the index and prevent reindexing',
 	'stdin|', # /|\z/ must be first for lone dash
-	qw(in-format|F=s lock=s@), @net_opt, @c_opt ],
+	qw(in-format|F=s lock=s@ commit-delay=i), @net_opt, @c_opt ],
 'plonk' => [ '--threads|--from=IDENT',
 	'exclude mail matching From: or threads from non-Message-ID searches',
 	qw(stdin| threads|t from|f=s mid=s oid=s), @c_opt ],
-'tag' => [ 'KEYWORDS... LOCATION...|--stdin',
+tag => [ 'KEYWORDS... LOCATION...|--stdin',
 	'set/unset keywords and/or labels on message(s)',
-	qw(stdin| in-format|F=s input|i=s@ oid=s@ mid=s@),
+	qw(stdin| in-format|F=s input|i=s@ oid=s@ mid=s@ commit-delay=i),
 	@net_opt, @c_opt, pass_through('-kw:foo for delete') ],
 
 'purge-mailsource' => [ 'LOCATION|--all',
@@ -262,10 +262,11 @@ our %CMD = ( # sorted in order of importance/use:
 	qw(in-format|F=s kw! offset=i recursive|r exclude=s include|I=s
 	verbose|v+ incremental!), @net_opt, # mainly for --proxy=
 	 @c_opt ],
-'import' => [ 'LOCATION...|--stdin [LABELS...]',
+import => [ 'LOCATION...|--stdin [LABELS...]',
 	'one-time import/update from URL or filesystem',
 	qw(stdin| offset=i recursive|r exclude=s include|I=s new-only
-	lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!),
+	lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!
+	commit-delay=i),
 	@net_opt, @c_opt ],
 'forget-mail-sync' => [ 'LOCATION...',
 	'forget sync information for a mail folder', @c_opt ],
@@ -1539,10 +1540,14 @@ sub sto_done_request {
 	my ($lei, $wq) = @_;
 	return unless $lei->{sto} && $lei->{sto}->{-wq_s1};
 	local $current_lei = $lei;
-	my $s = ($wq ? $wq->{lei_sock} : undef) // $lei->{sock};
-	my $errfh = $lei->{2} // *STDERR{GLOB};
-	my @io = $s ? ($errfh, $s) : ($errfh);
-	eval { $lei->{sto}->wq_io_do('done', \@io) };
+	if (my $n = $lei->{opt}->{'commit-delay'}) {
+		eval { $lei->{sto}->wq_do('schedule_commit', $n) };
+	} else {
+		my $s = ($wq ? $wq->{lei_sock} : undef) // $lei->{sock};
+		my $errfh = $lei->{2} // *STDERR{GLOB};
+		my @io = $s ? ($errfh, $s) : ($errfh);
+		eval { $lei->{sto}->wq_io_do('done', \@io) };
+	}
 	warn($@) if $@;
 }
 
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 9c07af14..aebb85a9 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -34,6 +34,7 @@ use Sys::Syslog qw(syslog openlog);
 use Errno qw(EEXIST ENOENT);
 use PublicInbox::Syscall qw(rename_noreplace);
 use PublicInbox::LeiStoreErr;
+use PublicInbox::DS qw(add_uniq_timer);
 
 sub new {
 	my (undef, $dir, $opt) = @_;
@@ -113,6 +114,11 @@ sub cat_blob {
 	$self->{im} ? $self->{im}->cat_blob($oid) : undef;
 }
 
+sub schedule_commit {
+	my ($self, $sec) = @_;
+	add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&done, $self);
+}
+
 # follows the stderr file
 sub _tail_err {
 	my ($self) = @_;
diff --git a/t/lei-import.t b/t/lei-import.t
index 8b09d3aa..b2c1de9b 100644
--- a/t/lei-import.t
+++ b/t/lei-import.t
@@ -2,6 +2,7 @@
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 use v5.12; use PublicInbox::TestCommon;
+use PublicInbox::DS qw(now);
 use autodie qw(open close);
 test_lei(sub {
 ok(!lei(qw(import -F bogus), 't/plack-qp.eml'), 'fails with bogus format');
@@ -141,6 +142,18 @@ $res = json_utf8->decode($lei_out);
 is_deeply($res->[0]->{kw}, [qw(answered flagged seen)], 'keyword added');
 is_deeply($res->[0]->{L}, [qw(boombox inbox)], 'labels preserved');
 
+lei_ok qw(import --commit-delay=1 +L:bin -F eml t/data/binary.patch);
+lei_ok 'ls-label';
+unlike($lei_out, qr/\bbin\b/, 'commit-delay delays label');
+my $end = now + 10;
+my $n = 1;
+diag 'waiting for lei/store commit...';
+do {
+	tick $n;
+	$n = 0.1;
+} until (!lei('ls-label') || $lei_out =~ /\bbin\b/ || now > $end);
+like($lei_out, qr/\bbin\b/, 'commit-delay eventually commits');
+
 # see t/lei_to_mail.t for "import -F mbox*"
 });
 done_testing;
diff --git a/t/lei-store-fail.t b/t/lei-store-fail.t
index fb0f2b75..c2f03148 100644
--- a/t/lei-store-fail.t
+++ b/t/lei-store-fail.t
@@ -9,8 +9,11 @@ use Fcntl qw(SEEK_SET);
 use File::Path qw(remove_tree);
 
 my $start_home = $ENV{HOME}; # bug guard
+my $utf8_oid = '9bf1002c49eb075df47247b74d69bcd555e23422';
 test_lei(sub {
 	lei_ok qw(import -q t/plack-qp.eml); # start the store
+	ok(!lei(qw(blob --mail), $utf8_oid), 't/utf8.eml not imported, yet');
+
 	my $opt;
 	pipe($opt->{0}, my $in_w);
 	open $opt->{1}, '+>', undef;
@@ -20,27 +23,30 @@ test_lei(sub {
 	my $tp = start_script($cmd, undef, $opt);
 	close $opt->{0};
 	$in_w->autoflush(1);
-	for (1..500) { # need to fill up 64k read buffer
-		print $in_w <<EOM or xbail "print $!";
+	print $in_w <<EOM or xbail "print: $!";
 From k\@y Fri Oct  2 00:00:00 1993
 From: <k\@example.com>
 Date: Sat, 02 Oct 2010 00:00:00 +0000
 Subject: hi
-Message-ID: <$_\@t>
+Message-ID: <0\@t>
 
 will this save?
 EOM
-	}
-	tick 0.2; # XXX ugh, this is so hacky
+	# import another message w/ delay while mboxrd import is still running
+	lei_ok qw(import -q --commit-delay=300 t/utf8.eml);
+	lei_ok qw(blob --mail), $utf8_oid,
+		\'blob immediately available despite --commit-delay';
+	lei_ok qw(q m:testmessage@example.com);
+	is($lei_out, "[null]\n", 'delayed commit is unindexed');
 
-	# make sto_done_request fail:
+	# make immediate ->sto_done_request fail from mboxrd import:
 	remove_tree("$ENV{HOME}/.local/share/lei/store");
 	# subsequent lei commands are undefined behavior,
 	# but we need to make sure the current lei command fails:
 
 	close $in_w; # should trigger ->done
 	$tp->join;
-	isnt($?, 0, 'lei import error code set on failure');
+	isnt($?, 0, 'lei import -F mboxrd error code set on failure');
 	is(-s $opt->{1}, 0, 'nothing in stdout');
 	isnt(-s $opt->{2}, 0, 'stderr not empty');
 	seek($opt->{2}, 0, SEEK_SET);
diff --git a/t/lei-tag.t b/t/lei-tag.t
index cccf0af6..7278dfcd 100644
--- a/t/lei-tag.t
+++ b/t/lei-tag.t
@@ -1,9 +1,10 @@
 #!perl -w
 # Copyright (C) 2021 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-use strict; use v5.10.1; use PublicInbox::TestCommon;
+use v5.12; use PublicInbox::TestCommon;
 require_git 2.6;
 require_mods(qw(json DBD::SQLite Xapian));
+use PublicInbox::DS qw(now);
 my ($ro_home, $cfg_path) = setup_public_inboxes;
 my $check_kw = sub {
 	my ($exp, %opt) = @_;
@@ -104,5 +105,17 @@ test_lei(sub {
 	lei_ok qw(tag +L:nope -F eml t/data/binary.patch);
 	like $lei_err, qr/\b1 unimported messages/, 'noted unimported'
 		or diag $lei_err;
+
+	lei_ok qw(tag -F eml --commit-delay=1 t/utf8.eml +L:utf8);
+	lei_ok 'ls-label';
+	unlike($lei_out, qr/\butf8\b/, 'commit-delay delays label');
+	my $end = now + 10;
+	my $n = 1;
+	diag 'waiting for lei/store commit...';
+	do {
+		tick $n;
+		$n = 0.1;
+	} until (!lei('ls-label') || $lei_out =~ /\butf8\b/ || now > $end);
+	like($lei_out, qr/\butf8\b/, 'commit-delay eventually commits');
 });
 done_testing;

^ permalink raw reply related	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2023-10-11  7:20 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-10-11  7:20 [PATCH 0/9] lei + import-related updates Eric Wong
2023-10-11  7:20 ` [PATCH 1/9] lei rediff: use ProcessIO for --drq support Eric Wong
2023-10-11  7:20 ` [PATCH 2/9] lei_xsearch: improve curl progress reporting Eric Wong
2023-10-11  7:20 ` [PATCH 3/9] msgtime: quiet warnings we can do nothing about Eric Wong
2023-10-11  7:20 ` [PATCH 4/9] msgtime: simplify msg_timestamp and msg_datestamp Eric Wong
2023-10-11  7:20 ` [PATCH 5/9] treewide: consolidate "From " line removal Eric Wong
2023-10-11  7:20 ` [PATCH 6/9] import: switch to Unix stream socket for fast-import Eric Wong
2023-10-11  7:20 ` [PATCH 7/9] import: cat_blob is a no-op w/o live fast-import Eric Wong
2023-10-11  7:20 ` [PATCH 8/9] lei blob: run cat_blob on lei/store for pending blobs Eric Wong
2023-10-11  7:20 ` [PATCH 9/9] lei import|tag|rm: support --commit-delay=SECONDS Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).